You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2017/10/04 21:50:49 UTC
[2/2] samza git commit: fixed some merge issues
fixed some merge issues
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/90fa985e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/90fa985e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/90fa985e
Branch: refs/heads/0.13.2
Commit: 90fa985ece6f07dc2520e526c323b67064c8d02a
Parents: 8ee61a6
Author: Boris S <bo...@apache.org>
Authored: Wed Oct 4 14:50:26 2017 -0700
Committer: Boris S <bo...@apache.org>
Committed: Wed Oct 4 14:50:26 2017 -0700
----------------------------------------------------------------------
.../samza/checkpoint/kafka/KafkaCheckpointManager.scala | 8 +-------
.../checkpoint/kafka/KafkaCheckpointManagerFactory.scala | 2 +-
2 files changed, 2 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/90fa985e/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 1e22763..f66097c 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -29,7 +29,7 @@ import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
import org.apache.samza.container.TaskName
import org.apache.samza.serializers.CheckpointSerde
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.{StreamSpec, SystemAdmin, _}
+import org.apache.samza.system.{SystemAdmin, _}
import org.apache.samza.util._
import org.apache.samza.{Partition, SamzaException}
@@ -272,11 +272,5 @@ class KafkaCheckpointManager(
}
- override def clearCheckpoints = {
- info("Clear checkpoint stream %s in system %s" format (checkpointTopic, systemName))
- val spec = StreamSpec.createCheckpointStreamSpec(checkpointTopic, systemName)
- systemAdmin.clearStream(spec)
- }
-
override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/90fa985e/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 402248f..9e8aefb 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -100,7 +100,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
new KafkaCheckpointManager(
clientId,
- KafkaUtil.getCheckpointTopic(jobName, jobId, config),
+ KafkaUtil.getCheckpointTopic(jobName, jobId),
systemName,
kafkaConfig.getCheckpointReplicationFactor.get.toInt,
socketTimeout,