You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2017/10/04 21:50:49 UTC

[2/2] samza git commit: fixed some merge issues

fixed some merge issues


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/90fa985e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/90fa985e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/90fa985e

Branch: refs/heads/0.13.2
Commit: 90fa985ece6f07dc2520e526c323b67064c8d02a
Parents: 8ee61a6
Author: Boris S <bo...@apache.org>
Authored: Wed Oct 4 14:50:26 2017 -0700
Committer: Boris S <bo...@apache.org>
Committed: Wed Oct 4 14:50:26 2017 -0700

----------------------------------------------------------------------
 .../samza/checkpoint/kafka/KafkaCheckpointManager.scala      | 8 +-------
 .../checkpoint/kafka/KafkaCheckpointManagerFactory.scala     | 2 +-
 2 files changed, 2 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/90fa985e/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 1e22763..f66097c 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -29,7 +29,7 @@ import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
 import org.apache.samza.container.TaskName
 import org.apache.samza.serializers.CheckpointSerde
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.{StreamSpec, SystemAdmin, _}
+import org.apache.samza.system.{SystemAdmin, _}
 import org.apache.samza.util._
 import org.apache.samza.{Partition, SamzaException}
 
@@ -272,11 +272,5 @@ class KafkaCheckpointManager(
 
   }
 
-  override def clearCheckpoints = {
-    info("Clear checkpoint stream %s in system %s" format (checkpointTopic, systemName))
-    val spec = StreamSpec.createCheckpointStreamSpec(checkpointTopic, systemName)
-    systemAdmin.clearStream(spec)
-  }
-
   override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic)
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/90fa985e/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 402248f..9e8aefb 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -100,7 +100,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
 
     new KafkaCheckpointManager(
       clientId,
-      KafkaUtil.getCheckpointTopic(jobName, jobId, config),
+      KafkaUtil.getCheckpointTopic(jobName, jobId),
       systemName,
       kafkaConfig.getCheckpointReplicationFactor.get.toInt,
       socketTimeout,