You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/09/24 23:37:41 UTC
samza git commit: SAMZA-615: Auto-migrate Kafka checkpoints into
coordinator stream
Repository: samza
Updated Branches:
refs/heads/master 9ab00c188 -> 841839de0
SAMZA-615: Auto-migrate Kafka checkpoints into coordinator stream
Squashed commit of the following:
commit ea64b4e9da0c68849d79353121d2577d27feb709
Author: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Date: Tue Sep 22 14:25:52 2015 -0700
SAMZA-615: Fix unit test in TestJobRunner since there is no guaranteed execution ordering between two tests
commit 3d9d897c152818993c426b00ceed755690baf5d1
Author: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Date: Tue Sep 22 11:11:40 2015 -0700
SAMZA-615: Merged w/ SAMZA-731 and removed all write operations in KafkaCheckpointManager
commit 4d35dc84614374f6efdb911cb9ca7ce18b1f2c76
Author: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Date: Wed Sep 16 13:37:11 2015 -0700
SAMZA-615: Add auto-checkpoint migration for Kafka checkpoints
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/841839de
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/841839de
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/841839de
Branch: refs/heads/master
Commit: 841839de0031f4dc743e1a6f1e795dc18cc13fbc
Parents: 9ab00c1
Author: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Authored: Thu Sep 24 14:36:23 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Committed: Thu Sep 24 14:36:46 2015 -0700
----------------------------------------------------------------------
build.gradle | 2 +
checkstyle/import-control.xml | 4 +
.../stream/CoordinatorStreamSystemConsumer.java | 1 +
.../stream/CoordinatorStreamSystemProducer.java | 1 +
.../messages/SetMigrationMetaMessage.java | 51 +++
.../apache/samza/migration/MigrationPlan.java | 27 ++
.../scala/org/apache/samza/job/JobRunner.scala | 4 +
.../samza/migration/JobRunnerMigration.scala | 52 +++
.../MockCoordinatorStreamSystemFactory.java | 17 +-
.../MockCoordinatorStreamWrappedConsumer.java | 5 +
.../samza/storage/TestStorageRecovery.java | 8 +
.../resources/test-migration-fail.properties | 26 ++
.../org/apache/samza/job/TestJobRunner.scala | 28 ++
.../old/checkpoint/KafkaCheckpointLogKey.scala | 188 ++++++++
.../old/checkpoint/KafkaCheckpointManager.scala | 337 +++++++++++++++
.../KafkaCheckpointManagerFactory.scala | 108 +++++
.../checkpoint/KafkaCheckpointMigration.scala | 95 ++++
.../system/kafka/KafkaSystemProducer.scala | 1 +
.../checkpoint/TestKafkaCheckpointManager.scala | 430 +++++++++++++++++++
19 files changed, 1383 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 3a7fabc..0c38a76 100644
--- a/build.gradle
+++ b/build.gradle
@@ -134,6 +134,7 @@ project(":samza-core_$scalaVersion") {
// Force scala joint compilation
sourceSets.main.scala.srcDir "src/main/java"
+ sourceSets.test.scala.srcDir "src/test/java"
sourceSets.main.java.srcDirs = []
jar {
@@ -233,6 +234,7 @@ project(":samza-kafka_$scalaVersion") {
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
+ testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
// Logging in tests is good.
testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index bc07ae8..53cb8b4 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -52,6 +52,10 @@
</subpackage>
</subpackage>
+ <subpackage name="migration">
+ <allow pkg="org.apache.samza.config" />
+ </subpackage>
+
<subpackage name="job">
<allow pkg="org.apache.samza.config" />
<allow pkg="org.apache.samza.coordinator.stream" />
http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
index 79291af..3113f09 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -127,6 +127,7 @@ public class CoordinatorStreamSystemConsumer {
public void stop() {
log.info("Stopping coordinator stream system consumer.");
systemConsumer.stop();
+ isStarted = false;
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
index 42ae00b..36cf759 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
@@ -91,6 +91,7 @@ public class CoordinatorStreamSystemProducer {
public void stop() {
log.info("Stopping coordinator stream producer.");
systemProducer.stop();
+ isStarted = false;
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetMigrationMetaMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetMigrationMetaMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetMigrationMetaMessage.java
new file mode 100644
index 0000000..70fbcb5
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetMigrationMetaMessage.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+/**
+ * The Set is used to store the migrations that have been performed
+ * The structure looks like:
+ * {
+ * Key: migration-info
+ * Type: set-migration-info
+ * Source: ContainerID
+ * MessageMap:
+ * {
+ * "0910checkpointmigration" : true
+ * }
+ * }
+ */
+public class SetMigrationMetaMessage extends CoordinatorStreamMessage {
+ public static final String TYPE = "set-migration-info";
+
+ public SetMigrationMetaMessage(CoordinatorStreamMessage message) {
+ super(message.getKeyArray(), message.getMessageMap());
+ }
+
+ public SetMigrationMetaMessage(String source, String metaInfoKey, String metaInfoVal) {
+ super(source);
+ setType(TYPE);
+ setKey("migration-info");
+ putMessageValue(metaInfoKey, metaInfoVal);
+ }
+
+ public String getMetaInfo(String metaInfoKey) {
+ return getMessageValues().get(metaInfoKey);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/main/java/org/apache/samza/migration/MigrationPlan.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/migration/MigrationPlan.java b/samza-core/src/main/java/org/apache/samza/migration/MigrationPlan.java
new file mode 100644
index 0000000..c6ff833
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/migration/MigrationPlan.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.migration;
+
+import org.apache.samza.config.Config;
+
+
+public interface MigrationPlan {
+ void migrate(Config config);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 1673217..d6109ec 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -24,6 +24,7 @@ import org.apache.samza.config.Config
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
import org.apache.samza.job.ApplicationStatus.Running
+import org.apache.samza.migration.JobRunnerMigration
import org.apache.samza.util.CommandLine
import org.apache.samza.util.Logging
import org.apache.samza.util.Util
@@ -98,6 +99,9 @@ class JobRunner(config: Config) extends Logging {
}
coordinatorSystemProducer.stop
+ // Perform any migration plan to run in job runner
+ JobRunnerMigration(config)
+
// Create the actual job, and submit it.
val job = jobFactory.getJob(config).submit
http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala b/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
new file mode 100644
index 0000000..374e27e
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.migration
+
+import org.apache.samza.config.Config
+import org.apache.samza.util.{Util, Logging}
+import org.apache.samza.SamzaException
+
+
+object JobRunnerMigration {
+ val CHECKPOINTMIGRATION = "old.checkpoint.KafkaCheckpointMigration"
+ def apply(config: Config) = {
+ val migration = new JobRunnerMigration
+ migration.checkpointMigration(config)
+ }
+}
+
+class JobRunnerMigration extends Logging {
+
+ def checkpointMigration(config: Config) = {
+ val checkpointFactory = Option(config.get("task.checkpoint.factory"))
+ checkpointFactory match {
+ case Some("org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory") =>
+ info("Performing checkpoint migration")
+ val checkpointMigrationPlan = Util.getObj[MigrationPlan](JobRunnerMigration.CHECKPOINTMIGRATION)
+ checkpointMigrationPlan.migrate(config)
+ case None =>
+ info("No task.checkpoint.factory defined, not performing any checkpoint migration")
+ case _ =>
+ val errorMsg = "Auto checkpoint migration for 0.10.0 upgrade is only supported for Kafka checkpointing system, " +
+ "for everything else, please use the checkpoint tool and remove task.checkpoint.factory configuration"
+ error(errorMsg)
+ throw new SamzaException(errorMsg)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
index 84ae0b5..9d8c98e 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
@@ -29,9 +29,11 @@ import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
import org.apache.samza.util.Util;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -115,7 +117,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
public MockSystemProducer(String expectedSource) {
this.expectedSource = expectedSource;
- this.envelopes = new ArrayList<OutgoingMessageEnvelope>();
+ this.envelopes = new ArrayList<>();
}
@@ -132,7 +134,18 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
}
public void send(String source, OutgoingMessageEnvelope envelope) {
- envelopes.add(envelope);
+ if (mockConsumer != null) {
+ MockCoordinatorStreamWrappedConsumer consumer = (MockCoordinatorStreamWrappedConsumer) mockConsumer;
+ SystemStreamPartition ssp = new SystemStreamPartition(envelope.getSystemStream(), new Partition(0));
+ consumer.register(ssp, "");
+ try {
+ consumer.addMessageEnvelope(new IncomingMessageEnvelope(ssp, "", envelope.getKey(), envelope.getMessage()));
+ } catch (IOException | InterruptedException e) {
+ e.printStackTrace();
+ }
+ } else {
+ envelopes.add(envelope);
+ }
}
public void flush(String source) {
http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
index 47a44b1..dd04d28 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
@@ -19,6 +19,7 @@
package org.apache.samza.coordinator.stream;
+import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -67,6 +68,10 @@ public class MockCoordinatorStreamWrappedConsumer extends BlockingEnvelopeMap {
convertConfigToCoordinatorMessage(config);
}
+ public void addMessageEnvelope(IncomingMessageEnvelope envelope) throws IOException, InterruptedException {
+ put(systemStreamPartition, envelope);
+ setIsAtHead(systemStreamPartition, true);
+ }
private void convertConfigToCoordinatorMessage(Config config) {
try {
http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
index 52d450d..53207ad 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
@@ -35,6 +35,7 @@ import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
import org.apache.samza.system.SystemStreamPartition;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -70,8 +71,15 @@ public class TestStorageRecovery {
when(systemAdmin.getSystemStreamMetadata(set2)).thenReturn(ssmMap);
}
+ @After
+ public void teardown() {
+ MockCoordinatorStreamSystemFactory.disableMockConsumerCache();
+ }
+
@Test
public void testStorageEngineReceivedAllValues() {
+ MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
+
String path = "/tmp/testing";
StorageRecovery storageRecovery = new StorageRecovery(config, path);
storageRecovery.run();
http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/test/resources/test-migration-fail.properties
----------------------------------------------------------------------
diff --git a/samza-core/src/test/resources/test-migration-fail.properties b/samza-core/src/test/resources/test-migration-fail.properties
new file mode 100644
index 0000000..b0657de
--- /dev/null
+++ b/samza-core/src/test/resources/test-migration-fail.properties
@@ -0,0 +1,26 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+job.factory.class=org.apache.samza.job.MockJobFactory
+job.name=test-job
+foo=bar
+systems.coordinator.samza.factory=org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
+task.checkpoint.factory=org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory
http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
index 52057ed..9036e81 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
@@ -21,8 +21,11 @@ package org.apache.samza.job
import java.io.File
+import org.apache.samza.SamzaException
import org.apache.samza.config.Config
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
import org.junit.Test
+import org.junit.After
import org.junit.Assert._
object TestJobRunner {
@@ -30,8 +33,33 @@ object TestJobRunner {
}
class TestJobRunner {
+
+ @After
+ def teardown {
+ MockCoordinatorStreamSystemFactory.disableMockConsumerCache()
+ }
+
+ @Test
+ def testJobRunnerMigrationFails {
+ MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
+
+ try {
+ JobRunner.main(Array(
+ "--config-factory",
+ "org.apache.samza.config.factories.PropertiesConfigFactory",
+ "--config-path",
+ "file://%s/src/test/resources/test-migration-fail.properties" format new File(".").getCanonicalPath))
+ fail("Should have failed already.")
+ } catch {
+ case se: SamzaException => assertEquals(se.getMessage, "Auto checkpoint migration for 0.10.0 upgrade is only supported for Kafka checkpointing system, " +
+ "for everything else, please use the checkpoint tool and remove task.checkpoint.factory configuration")
+ }
+ }
+
@Test
def testJobRunnerWorks {
+ MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
+
JobRunner.main(Array(
"--config-factory",
"org.apache.samza.config.factories.PropertiesConfigFactory",
http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointLogKey.scala b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointLogKey.scala
new file mode 100644
index 0000000..958d07c
--- /dev/null
+++ b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointLogKey.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package old.checkpoint
+
+import java.util
+
+import org.apache.samza.SamzaException
+import org.apache.samza.container.TaskName
+import org.codehaus.jackson.`type`.TypeReference
+import org.codehaus.jackson.map.ObjectMapper
+
+import scala.collection.JavaConversions._
+
+/**
+ * Kafka Checkpoint Log-specific key used to identify what type of entry is
+ * written for any particular log entry.
+ *
+ * @param map Backing map to hold key values
+ */
+class KafkaCheckpointLogKey private (val map: Map[String, String]) {
+ // This might be better as a case class...
+ import KafkaCheckpointLogKey._
+
+ /**
+ * Serialize this key to bytes
+ * @return Key as bytes
+ */
+ def toBytes(): Array[Byte] = {
+ val jMap = new util.HashMap[String, String](map.size)
+ jMap.putAll(map)
+
+ JSON_MAPPER.writeValueAsBytes(jMap)
+ }
+
+ private def getKey = map.getOrElse(CHECKPOINT_KEY_KEY, throw new SamzaException("No " + CHECKPOINT_KEY_KEY + " in map for Kafka Checkpoint log key"))
+
+ /**
+ * Is this key for a checkpoint entry?
+ *
+ * @return true iff this key's entry is for a checkpoint
+ */
+ def isCheckpointKey = getKey.equals(CHECKPOINT_KEY_TYPE)
+
+ /**
+ * Is this key for a changelog partition mapping?
+ *
+ * @return true iff this key's entry is for a changelog partition mapping
+ */
+ def isChangelogPartitionMapping = getKey.equals(CHANGELOG_PARTITION_KEY_TYPE)
+
+ /**
+ * If this Key is for a checkpoint entry, return its associated TaskName.
+ *
+ * @return TaskName for this checkpoint or throw an exception if this key does not have a TaskName entry
+ */
+ def getCheckpointTaskName = {
+ val asString = map.getOrElse(CHECKPOINT_TASKNAME_KEY, throw new SamzaException("No TaskName in checkpoint key: " + this))
+ new TaskName(asString)
+ }
+
+ def canEqual(other: Any): Boolean = other.isInstanceOf[KafkaCheckpointLogKey]
+
+ override def equals(other: Any): Boolean = other match {
+ case that: KafkaCheckpointLogKey =>
+ (that canEqual this) &&
+ map == that.map
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ val state = Seq(map)
+ state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+ }
+}
+
+object KafkaCheckpointLogKey {
+ /**
+ * Messages in the checkpoint log have keys associated with them. These keys are maps that describe the message's
+ * type, either a checkpoint or a changelog-partition-mapping.
+ */
+ val CHECKPOINT_KEY_KEY = "type"
+ val CHECKPOINT_KEY_TYPE = "checkpoint"
+ val CHANGELOG_PARTITION_KEY_TYPE = "changelog-partition-mapping"
+ val CHECKPOINT_TASKNAME_KEY = "taskName"
+ val SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY = "systemstreampartition-grouper-factory"
+
+ /**
+ * Partition mapping keys have no dynamic values, so we just need one instance.
+ */
+ val CHANGELOG_PARTITION_MAPPING_KEY = new KafkaCheckpointLogKey(Map(CHECKPOINT_KEY_KEY -> CHANGELOG_PARTITION_KEY_TYPE))
+
+ private val JSON_MAPPER = new ObjectMapper()
+ val KEY_TYPEREFERENCE = new TypeReference[util.HashMap[String, String]]() {}
+
+ var systemStreamPartitionGrouperFactoryString:Option[String] = None
+
+ /**
+ * Set the name of the factory configured to provide the SystemStreamPartition grouping
+ * so it be included in the key.
+ *
+ * @param str Config value of SystemStreamPartition Grouper Factory
+ */
+ def setSystemStreamPartitionGrouperFactoryString(str:String) = {
+ systemStreamPartitionGrouperFactoryString = Some(str)
+ }
+
+ /**
+ * Get the name of the factory configured to provide the SystemStreamPartition grouping
+ * so it be included in the key
+ */
+ def getSystemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString.getOrElse(throw new SamzaException("No SystemStreamPartition grouping factory string has been set."))
+
+ /**
+ * Build a key for a a checkpoint log entry for a particular TaskName
+ * @param taskName TaskName to build for this checkpoint entry
+ *
+ * @return Key for checkpoint log entry
+ */
+ def getCheckpointKey(taskName:TaskName) = {
+ val map = Map(CHECKPOINT_KEY_KEY -> CHECKPOINT_KEY_TYPE,
+ CHECKPOINT_TASKNAME_KEY -> taskName.getTaskName,
+ SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY -> getSystemStreamPartitionGrouperFactoryString)
+
+ new KafkaCheckpointLogKey(map)
+ }
+
+ /**
+ * Build a key for a changelog partition mapping entry
+ *
+ * @return Key for changelog partition mapping entry
+ */
+ def getChangelogPartitionMappingKey() = CHANGELOG_PARTITION_MAPPING_KEY
+
+ /**
+ * Deserialize a Kafka checkpoint log key
+ * @param bytes Serialized (via JSON) Kafka checkpoint log key
+ * @return Checkpoint log key
+ */
+ def fromBytes(bytes: Array[Byte]): KafkaCheckpointLogKey = {
+ try {
+ val jmap: util.HashMap[String, String] = JSON_MAPPER.readValue(bytes, KEY_TYPEREFERENCE)
+
+ if(!jmap.containsKey(CHECKPOINT_KEY_KEY)) {
+ throw new SamzaException("No type entry in checkpoint key: " + jmap)
+ }
+
+ // Only checkpoint keys have ssp grouper factory keys
+ if(jmap.get(CHECKPOINT_KEY_KEY).equals(CHECKPOINT_KEY_TYPE)) {
+ val sspGrouperFactory = jmap.get(SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY)
+
+ if (sspGrouperFactory == null) {
+ throw new SamzaException("No SystemStreamPartition Grouper factory entry in checkpoint key: " + jmap)
+ }
+
+ if (!sspGrouperFactory.equals(getSystemStreamPartitionGrouperFactoryString)) {
+ throw new DifferingSystemStreamPartitionGrouperFactoryValues(sspGrouperFactory, getSystemStreamPartitionGrouperFactoryString)
+ }
+ }
+
+ new KafkaCheckpointLogKey(jmap.toMap)
+ } catch {
+ case e: Exception =>
+ throw new SamzaException("Exception while deserializing checkpoint key", e)
+ }
+ }
+}
+
+class DifferingSystemStreamPartitionGrouperFactoryValues(inKey:String, inConfig:String) extends SamzaException {
+ override def getMessage() = "Checkpoint key's SystemStreamPartition Grouper factory (" + inKey +
+ ") does not match value from current configuration (" + inConfig + "). " +
+ "This likely means the SystemStreamPartitionGrouper was changed between job runs, which is not supported."
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManager.scala
new file mode 100644
index 0000000..627631a
--- /dev/null
+++ b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManager.scala
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package old.checkpoint
+
+import java.nio.ByteBuffer
+import java.util
+import java.util.Properties
+
+import kafka.admin.AdminUtils
+import kafka.api._
+import kafka.common.{ErrorMapping, InvalidMessageSizeException, TopicAndPartition, TopicExistsException, UnknownTopicOrPartitionException}
+import kafka.consumer.SimpleConsumer
+import kafka.message.InvalidMessageException
+import kafka.utils.Utils
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
+import org.apache.samza.container.TaskName
+import org.apache.samza.serializers.CheckpointSerde
+import org.apache.samza.system.kafka.TopicMetadataCache
+import org.apache.samza.util.{ExponentialSleepStrategy, KafkaUtil, Logging, TopicMetadataStore}
+
+import scala.collection.mutable
+
+/**
+ * Kafka checkpoint manager is used to store checkpoints in a Kafka topic.
+ * To read a checkpoint for a specific taskName, we find the newest message
+ * keyed to that taskName. If there is no such message, no checkpoint data
+ * exists. The underlying log has a single partition into which all
+ * checkpoints and TaskName to changelog partition mappings are written.
+ */
+class KafkaCheckpointManager(clientId: String,
+ checkpointTopic: String,
+ systemName: String,
+ socketTimeout: Int,
+ bufferSize: Int,
+ fetchSize: Int,
+ metadataStore: TopicMetadataStore,
+ connectProducer: () => Producer[Array[Byte], Array[Byte]],
+ connectZk: () => ZkClient,
+ systemStreamPartitionGrouperFactoryString: String,
+ retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
+ serde: CheckpointSerde = new CheckpointSerde,
+ checkpointTopicProperties: Properties = new Properties) extends Logging {
+ import KafkaCheckpointManager._
+
+ var taskNames = Set[TaskName]()
+ var producer: Producer[Array[Byte], Array[Byte]] = null
+ var taskNamesToOffsets: Map[TaskName, Checkpoint] = null
+
+ var startingOffset: Option[Long] = None // Where to start reading for each subsequent call of readCheckpoint
+
+ KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(systemStreamPartitionGrouperFactoryString)
+
+ info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format(clientId, checkpointTopic, systemName))
+
+ private def getConsumer(): SimpleConsumer = {
+ val metadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
+ val metadata = metadataMap(checkpointTopic)
+ val partitionMetadata = metadata.partitionsMetadata
+ .filter(_.partitionId == 0)
+ .headOption
+ .getOrElse(throw new KafkaCheckpointException("Tried to find partition information for partition 0 for checkpoint topic, but it didn't exist in Kafka."))
+ val leader = partitionMetadata
+ .leader
+ .getOrElse(throw new SamzaException("No leader available for topic %s" format checkpointTopic))
+
+ info("Connecting to leader %s:%d for topic %s and to fetch all checkpoint messages." format(leader.host, leader.port, checkpointTopic))
+
+ new SimpleConsumer(leader.host, leader.port, socketTimeout, bufferSize, clientId)
+ }
+
+ private def getEarliestOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition): Long = consumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, -1)
+
+ private def getOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition, earliestOrLatest: Long): Long = {
+ val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))))
+ .partitionErrorAndOffsets
+ .get(topicAndPartition)
+ .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:0" format checkpointTopic))
+ // Fail or retry if there was an an issue with the offset request.
+ KafkaUtil.maybeThrowException(offsetResponse.error)
+
+ val offset: Long = offsetResponse
+ .offsets
+ .headOption
+ .getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:0" format checkpointTopic))
+
+ offset
+ }
+
+ /**
+ * Read the last checkpoint for specified TaskName
+ *
+ * @param taskName Specific Samza taskName for which to get the last checkpoint of.
+ */
+ def readLastCheckpoint(taskName: TaskName): Checkpoint = {
+ if (!taskNames.contains(taskName)) {
+ throw new SamzaException(taskName + " not registered with this CheckpointManager")
+ }
+
+ info("Reading checkpoint for taskName " + taskName)
+
+ if (taskNamesToOffsets == null) {
+ info("No TaskName to checkpoint mapping provided. Reading for first time.")
+ taskNamesToOffsets = readCheckpointsFromLog()
+ } else {
+ info("Already existing checkpoint mapping. Merging new offsets")
+ taskNamesToOffsets ++= readCheckpointsFromLog()
+ }
+
+ val checkpoint = taskNamesToOffsets.get(taskName).getOrElse(null)
+
+ info("Got checkpoint state for taskName %s: %s" format(taskName, checkpoint))
+
+ checkpoint
+ }
+
+ /**
+ * Read through entire log, discarding changelog mapping, and building map of TaskNames to Checkpoints
+ */
+ def readCheckpointsFromLog(): Map[TaskName, Checkpoint] = {
+ val checkpoints = mutable.Map[TaskName, Checkpoint]()
+
+ def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isCheckpointKey
+
+ def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
+ val taskName = checkpointKey.getCheckpointTaskName
+ val checkpoint = serde.fromBytes(Utils.readBytes(payload))
+ debug("Adding checkpoint " + checkpoint + " for taskName " + taskName)
+ checkpoints.put(taskName, checkpoint) // replacing any existing, older checkpoints as we go
+ }
+
+ readLog(CHECKPOINT_LOG4J_ENTRY, shouldHandleEntry, handleCheckpoint)
+
+ checkpoints.toMap /* of the immutable kind */
+ }
+
+ /**
+ * Read through entire log, discarding checkpoints, finding latest changelogPartitionMapping
+ *
+ * Lots of duplicated code from the checkpoint method, but will be better to refactor this code into AM-based
+ * checkpoint log reading
+ */
+ def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = {
+ var changelogPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]()
+
+ def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isChangelogPartitionMapping
+
+ def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
+ changelogPartitionMapping = serde.changelogPartitionMappingFromBytes(Utils.readBytes(payload))
+
+ debug("Adding changelog partition mapping" + changelogPartitionMapping)
+ }
+
+ readLog(CHANGELOG_PARTITION_MAPPING_LOG4j, shouldHandleEntry, handleCheckpoint)
+
+ changelogPartitionMapping
+ }
+
+ /**
+ * Common code for reading both changelog partition mapping and change log
+ *
+ * @param entryType What type of entry to look for within the log key's
+ * @param handleEntry Code to handle an entry in the log once it's found
+ */
+ private def readLog(entryType:String, shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean,
+ handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => Unit): Unit = {
+ retryBackoff.run[Unit](
+ loop => {
+ val consumer = getConsumer()
+
+ val topicAndPartition = new TopicAndPartition(checkpointTopic, 0)
+
+ try {
+ var offset = startingOffset.getOrElse(getEarliestOffset(consumer, topicAndPartition))
+
+ info("Got offset %s for topic %s and partition 0. Attempting to fetch messages for %s." format(offset, checkpointTopic, entryType))
+
+ val latestOffset = getOffset(consumer, topicAndPartition, OffsetRequest.LatestTime)
+
+ info("Get latest offset %s for topic %s and partition 0." format(latestOffset, checkpointTopic))
+
+ if (offset < 0) {
+ info("Got offset 0 (no messages in %s) for topic %s and partition 0, so returning empty collection. If you expected the checkpoint topic to have messages, you're probably going to lose data." format (entryType, checkpointTopic))
+ return
+ }
+
+ while (offset < latestOffset) {
+ val request = new FetchRequestBuilder()
+ .addFetch(checkpointTopic, 0, offset, fetchSize)
+ .maxWait(500)
+ .minBytes(1)
+ .clientId(clientId)
+ .build
+
+ val fetchResponse = consumer.fetch(request)
+ if (fetchResponse.hasError) {
+ warn("Got error code from broker for %s: %s" format(checkpointTopic, fetchResponse.errorCode(checkpointTopic, 0)))
+ val errorCode = fetchResponse.errorCode(checkpointTopic, 0)
+ if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) {
+ warn("Got an offset out of range exception while getting last entry in %s for topic %s and partition 0, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (entryType, checkpointTopic))
+ return
+ }
+ KafkaUtil.maybeThrowException(errorCode)
+ }
+
+ for (response <- fetchResponse.messageSet(checkpointTopic, 0)) {
+ offset = response.nextOffset
+ startingOffset = Some(offset) // For next time we call
+
+ if (!response.message.hasKey) {
+ throw new KafkaCheckpointException("Encountered message without key.")
+ }
+
+ val checkpointKey = KafkaCheckpointLogKey.fromBytes(Utils.readBytes(response.message.key))
+
+ if (!shouldHandleEntry(checkpointKey)) {
+ debug("Skipping " + entryType + " entry with key " + checkpointKey)
+ } else {
+ handleEntry(response.message.payload, checkpointKey)
+ }
+ }
+ }
+ } finally {
+ consumer.close()
+ }
+
+ loop.done
+ Unit
+ },
+
+ (exception, loop) => {
+ exception match {
+ case e: InvalidMessageException => throw new KafkaCheckpointException("Got InvalidMessageException from Kafka, which is unrecoverable, so fail the samza job", e)
+ case e: InvalidMessageSizeException => throw new KafkaCheckpointException("Got InvalidMessageSizeException from Kafka, which is unrecoverable, so fail the samza job", e)
+ case e: UnknownTopicOrPartitionException => throw new KafkaCheckpointException("Got UnknownTopicOrPartitionException from Kafka, which is unrecoverable, so fail the samza job", e)
+ case e: KafkaCheckpointException => throw e
+ case e: Exception =>
+ warn("While trying to read last %s entry for topic %s and partition 0: %s. Retrying." format(entryType, checkpointTopic, e))
+ debug("Exception detail:", e)
+ }
+ }
+ ).getOrElse(throw new SamzaException("Failed to get entries for " + entryType + " from topic " + checkpointTopic))
+
+ }
+
+ def topicExists: Boolean = {
+ val zkClient = connectZk()
+ try {
+ AdminUtils.topicExists(zkClient, checkpointTopic)
+ } finally {
+ zkClient.close()
+ }
+ }
+
+ def start {
+ if (topicExists) {
+ validateTopic
+ } else {
+ throw new SamzaException("Failed to start KafkaCheckpointManager for non-existing checkpoint topic. KafkaCheckpointManager should only be used for migration purpose.")
+ }
+ }
+
+ def register(taskName: TaskName) {
+ debug("Adding taskName " + taskName + " to " + this)
+ taskNames += taskName
+ }
+
+ def stop = {
+ if (producer != null) {
+ producer.close
+ }
+ }
+
+ def validateTopic = {
+ info("Validating checkpoint topic %s." format checkpointTopic)
+ retryBackoff.run(
+ loop => {
+ val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, metadataStore.getTopicInfo)
+ val topicMetadata = topicMetadataMap(checkpointTopic)
+ KafkaUtil.maybeThrowException(topicMetadata.errorCode)
+
+ val partitionCount = topicMetadata.partitionsMetadata.length
+ if (partitionCount != 1) {
+ throw new KafkaCheckpointException("Checkpoint topic validation failed for topic %s because partition count %s did not match expected partition count of 1." format(checkpointTopic, topicMetadata.partitionsMetadata.length))
+ }
+
+ info("Successfully validated checkpoint topic %s." format checkpointTopic)
+ loop.done
+ },
+
+ (exception, loop) => {
+ exception match {
+ case e: KafkaCheckpointException => throw e
+ case e: Exception =>
+ warn("While trying to validate topic %s: %s. Retrying." format(checkpointTopic, e))
+ debug("Exception detail:", e)
+ }
+ }
+ )
+ }
+
+ override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic)
+}
+
+object KafkaCheckpointManager {
+ val CHECKPOINT_LOG4J_ENTRY = "checkpoint log"
+ val CHANGELOG_PARTITION_MAPPING_LOG4j = "changelog partition mapping"
+}
+
+/**
+ * KafkaCheckpointManager handles retries, so we need two kinds of exceptions:
+ * one to signal a hard failure, and the other to retry. The
+ * KafkaCheckpointException is thrown to indicate a hard failure that the Kafka
+ * CheckpointManager can't recover from.
+ */
+class KafkaCheckpointException(s: String, t: Throwable) extends SamzaException(s, t) {
+ def this(s: String) = this(s, null)
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManagerFactory.scala
new file mode 100644
index 0000000..189752a
--- /dev/null
+++ b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManagerFactory.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package old.checkpoint
+
+import java.util.Properties
+
+import kafka.utils.ZKStringSerializer
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.config.Config
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.KafkaConfig.Config2Kafka
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, Logging}
+
+object KafkaCheckpointManagerFactory {
+ /**
+ * Version number to track the format of the checkpoint log
+ */
+ val CHECKPOINT_LOG_VERSION_NUMBER = 1
+
+ val INJECTED_PRODUCER_PROPERTIES = Map(
+ "acks" -> "all",
+ // Forcibly disable compression because Kafka doesn't support compression
+ // on log compacted topics. Details in SAMZA-586.
+ "compression.type" -> "none")
+
+ // Set the checkpoint topic configs to have a very small segment size and
+ // enable log compaction. This keeps job startup time small since there
+ // are fewer useless (overwritten) messages to read from the checkpoint
+ // topic.
+ def getCheckpointTopicProperties(config: Config) = {
+ val segmentBytes = "26214400"
+
+ (new Properties /: Map(
+ "cleanup.policy" -> "compact",
+ "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props }
+ }
+}
+
+class KafkaCheckpointManagerFactory extends Logging {
+ import KafkaCheckpointManagerFactory._
+
+ def getCheckpointManager(config: Config, registry: MetricsRegistry): KafkaCheckpointManager = {
+ val clientId = KafkaUtil.getClientId("samza-checkpoint-manager", config)
+ val systemName = Option(config.get("task.checkpoint.system")).getOrElse(throw new SamzaException("no system defined for checkpoint manager, cannot perform migration."))
+ val producerConfig = config.getKafkaSystemProducerConfig(
+ systemName,
+ clientId,
+ INJECTED_PRODUCER_PROPERTIES)
+ val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
+ val socketTimeout = consumerConfig.socketTimeoutMs
+ val bufferSize = consumerConfig.socketReceiveBufferBytes
+ val fetchSize = consumerConfig.fetchMessageMaxBytes // must be > buffer size
+
+ val connectProducer = () => {
+ new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
+ }
+ val zkConnect = Option(consumerConfig.zkConnect)
+ .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
+ val connectZk = () => {
+ new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+ }
+ val jobName = config.getName.getOrElse(throw new SamzaException("Missing job name in configs"))
+ val jobId = config.getJobId.getOrElse("1")
+ val bootstrapServers = producerConfig.bootsrapServers
+ val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, clientId, socketTimeout)
+ val checkpointTopic = getTopic(jobName, jobId)
+
+ // Find out the SSPGrouperFactory class so it can be included/verified in the key
+ val systemStreamPartitionGrouperFactoryString = config.getSystemStreamPartitionGrouperFactory
+
+ new KafkaCheckpointManager(
+ clientId,
+ checkpointTopic,
+ systemName,
+ socketTimeout,
+ bufferSize,
+ fetchSize,
+ metadataStore,
+ connectProducer,
+ connectZk,
+ systemStreamPartitionGrouperFactoryString,
+ checkpointTopicProperties = getCheckpointTopicProperties(config))
+ }
+
+ private def getTopic(jobName: String, jobId: String) =
+ "__samza_checkpoint_ver_%d_for_%s_%s" format (CHECKPOINT_LOG_VERSION_NUMBER, jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointMigration.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointMigration.scala b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointMigration.scala
new file mode 100644
index 0000000..2e10815
--- /dev/null
+++ b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointMigration.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package old.checkpoint
+
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
+import org.apache.samza.config.Config
+import org.apache.samza.container.TaskName
+import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.coordinator.stream.messages.{CoordinatorStreamMessage, SetMigrationMetaMessage}
+import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer, CoordinatorStreamSystemFactory}
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.migration.MigrationPlan
+import org.apache.samza.storage.ChangelogPartitionManager
+import org.apache.samza.util.{Logging, NoOpMetricsRegistry}
+import scala.collection.JavaConverters._
+
+class KafkaCheckpointMigration extends MigrationPlan with Logging {
+ val source = "CHECKPOINTMIGRATION"
+ val migrationKey = "CheckpointMigration09to10"
+ val migrationVal = "true"
+
+ def migrate(config: Config, getManager:() => KafkaCheckpointManager): Unit = {
+ val coordinatorFactory = new CoordinatorStreamSystemFactory
+ val coordinatorSystemProducer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
+ var manager = getManager()
+ // make sure to validate that we only perform migration when the checkpoint topic exists
+ if (manager.topicExists) {
+ manager.validateTopic
+ val checkpointMap = manager.readCheckpointsFromLog()
+ manager.stop
+
+ manager = getManager()
+ val changelogMap = manager.readChangeLogPartitionMapping()
+ manager.stop
+
+ val coordinatorSystemConsumer = coordinatorFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
+ if (migrationVerification(coordinatorSystemConsumer)) {
+ info("Migration %s was already performed, doing nothing" format migrationKey)
+ return
+ }
+ info("No previous migration for %s were detected, performing migration" format migrationKey)
+ val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, source)
+ checkpointManager.start()
+ checkpointMap.foreach { case (taskName: TaskName, checkpoint: Checkpoint) => checkpointManager.writeCheckpoint(taskName, checkpoint) }
+ val changelogPartitionManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, source)
+ changelogPartitionManager.start()
+ changelogPartitionManager.writeChangeLogPartitionMapping(changelogMap)
+ changelogPartitionManager.stop()
+ checkpointManager.stop()
+ }
+ migrationCompletionMark(coordinatorSystemProducer)
+ }
+
+ override def migrate(config: Config) {
+ val factory = new KafkaCheckpointManagerFactory
+ def getManager() = factory.getCheckpointManager(config, new NoOpMetricsRegistry)
+ migrate(config, getManager)
+ }
+
+ def migrationVerification(coordinatorSystemConsumer : CoordinatorStreamSystemConsumer): Boolean = {
+ coordinatorSystemConsumer.register()
+ coordinatorSystemConsumer.start()
+ coordinatorSystemConsumer.bootstrap()
+ val stream = coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE)
+ coordinatorSystemConsumer.stop()
+ val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal)
+ stream.contains(message.asInstanceOf[CoordinatorStreamMessage])
+ }
+
+ def migrationCompletionMark(coordinatorSystemProducer: CoordinatorStreamSystemProducer) = {
+ info("Marking completion of migration %s" format migrationKey)
+ val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal)
+ coordinatorSystemProducer.start()
+ coordinatorSystemProducer.send(message)
+ coordinatorSystemProducer.stop()
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index c0c34bf..9a44d46 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -55,6 +55,7 @@ class KafkaSystemProducer(systemName: String,
if (producer != null) {
latestFuture.keys.foreach(flush(_))
producer.close
+ producer = null
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/841839de/samza-kafka/src/test/scala/old/checkpoint/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/old/checkpoint/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/old/checkpoint/TestKafkaCheckpointManager.scala
new file mode 100644
index 0000000..2c0304f
--- /dev/null
+++ b/samza-kafka/src/test/scala/old/checkpoint/TestKafkaCheckpointManager.scala
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package old.checkpoint
+
+import java.util.Properties
+import kafka.admin.AdminUtils
+import kafka.common.{InvalidMessageSizeException, UnknownTopicOrPartitionException}
+import kafka.message.InvalidMessageException
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{TestUtils, TestZKUtils, Utils, ZKStringSerializer}
+import kafka.zk.EmbeddedZookeeper
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.clients.producer.{Producer, ProducerRecord, KafkaProducer, ProducerConfig}
+import org.apache.samza.checkpoint.{CheckpointManager, Checkpoint}
+import org.apache.samza.config.SystemConfig._
+import org.apache.samza.config.{JobConfig, KafkaProducerConfig, MapConfig}
+import org.apache.samza.container.TaskName
+import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
+import org.apache.samza.coordinator.MockSystemFactory
+import org.apache.samza.coordinator.stream.messages.SetMigrationMetaMessage
+import org.apache.samza.coordinator.stream.{MockCoordinatorStreamSystemFactory, CoordinatorStreamSystemFactory}
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.serializers.CheckpointSerde
+import org.apache.samza.storage.ChangelogPartitionManager
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, TopicMetadataStore}
+import org.apache.samza.{Partition, SamzaException}
+import org.junit.Assert._
+import org.junit._
+import scala.collection.JavaConversions._
+import scala.collection._
+
+class TestKafkaCheckpointManager {
+
+ val checkpointTopic = "checkpoint-topic"
+ val serdeCheckpointTopic = "checkpoint-topic-invalid-serde"
+ val checkpointTopicConfig = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(null)
+ val zkConnect: String = TestZKUtils.zookeeperConnect
+ var zkClient: ZkClient = null
+ val zkConnectionTimeout = 6000
+ val zkSessionTimeout = 6000
+
+ val brokerId1 = 0
+ val brokerId2 = 1
+ val brokerId3 = 2
+ val ports = TestUtils.choosePorts(3)
+ val (port1, port2, port3) = (ports(0), ports(1), ports(2))
+
+ val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
+ props1.put("controlled.shutdown.enable", "true")
+ val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
+ props1.put("controlled.shutdown.enable", "true")
+ val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
+ props1.put("controlled.shutdown.enable", "true")
+
+ val config = new java.util.HashMap[String, Object]()
+ val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
+ config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
+ config.put("acks", "all")
+ config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
+ config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(java.lang.Integer.MAX_VALUE-1)).toString)
+ config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
+ val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+ val partition = new Partition(0)
+ val partition2 = new Partition(1)
+ val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123"))
+ val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "12345"))
+ var zookeeper: EmbeddedZookeeper = null
+ var server1: KafkaServer = null
+ var server2: KafkaServer = null
+ var server3: KafkaServer = null
+ var metadataStore: TopicMetadataStore = null
+
+ val systemStreamPartitionGrouperFactoryString = classOf[GroupByPartitionFactory].getCanonicalName
+
+ @Before
+ def beforeSetupServers {
+ zookeeper = new EmbeddedZookeeper(zkConnect)
+ server1 = TestUtils.createServer(new KafkaConfig(props1))
+ server2 = TestUtils.createServer(new KafkaConfig(props2))
+ server3 = TestUtils.createServer(new KafkaConfig(props3))
+ metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
+ }
+
+ @After
+ def afterCleanLogDirs {
+ server1.shutdown
+ server1.awaitShutdown()
+ server2.shutdown
+ server2.awaitShutdown()
+ server3.shutdown
+ server3.awaitShutdown()
+ Utils.rm(server1.config.logDirs)
+ Utils.rm(server2.config.logDirs)
+ Utils.rm(server3.config.logDirs)
+ zookeeper.shutdown
+ }
+
+ private def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint, cpTopic: String = checkpointTopic) = {
+ val producer: Producer[Array[Byte], Array[Byte]] = new KafkaProducer(producerConfig.getProducerProperties)
+ val record = new ProducerRecord(
+ cpTopic,
+ 0,
+ KafkaCheckpointLogKey.getCheckpointKey(taskName).toBytes(),
+ new CheckpointSerde().toBytes(checkpoint)
+ )
+ try {
+ producer.send(record).get()
+ } catch {
+ case e: Exception => println(e.getMessage)
+ } finally {
+ producer.close()
+ }
+ }
+
+ private def writeChangeLogPartitionMapping(changelogMapping: Map[TaskName, Integer], cpTopic: String = checkpointTopic) = {
+ val producer: Producer[Array[Byte], Array[Byte]] = new KafkaProducer(producerConfig.getProducerProperties)
+ val record = new ProducerRecord(
+ cpTopic,
+ 0,
+ KafkaCheckpointLogKey.getChangelogPartitionMappingKey().toBytes(),
+ new CheckpointSerde().changelogPartitionMappingToBytes(changelogMapping)
+ )
+ try {
+ producer.send(record).get()
+ } catch {
+ case e: Exception => println(e.getMessage)
+ } finally {
+ producer.close()
+ }
+ }
+
+ private def createCheckpointTopic(cpTopic: String = checkpointTopic) = {
+ val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+ try {
+ AdminUtils.createTopic(
+ zkClient,
+ checkpointTopic,
+ 1,
+ 1,
+ checkpointTopicConfig)
+ } catch {
+ case e: Exception => println(e.getMessage)
+ } finally {
+ zkClient.close
+ }
+ }
+
+ @Test
+ def testStartFailureWithNoCheckpointTopic() {
+ try {
+ val map = new java.util.HashMap[String, String]()
+ val mapConfig = Map(
+ "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
+ JobConfig.JOB_NAME -> "test",
+ JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
+ JobConfig.JOB_CONTAINER_COUNT -> "2",
+ "task.inputs" -> "test.stream1",
+ "task.checkpoint.system" -> "test",
+ SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
+ SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName)
+ // Enable consumer caching
+ MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
+
+ val config = new MapConfig(mapConfig)
+ val migrate = new KafkaCheckpointMigration
+ val oldCheckpointManager = getKafkaCheckpointManager
+ oldCheckpointManager.start
+ fail("KafkaCheckpointManager start should have failed")
+ } catch {
+ case se: SamzaException => assertEquals(se.getMessage, "Failed to start KafkaCheckpointManager for non-existing checkpoint topic. " +
+ "KafkaCheckpointManager should only be used for migration purpose.")
+ }
+ }
+
+ @Test
+ def testMigrationWithNoCheckpointTopic() {
+ try {
+ val map = new java.util.HashMap[String, String]()
+ val mapConfig = Map(
+ "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
+ JobConfig.JOB_NAME -> "test",
+ JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
+ JobConfig.JOB_CONTAINER_COUNT -> "2",
+ "task.inputs" -> "test.stream1",
+ "task.checkpoint.system" -> "test",
+ SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
+ SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName)
+ // Enable consumer caching
+ MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
+
+ val config = new MapConfig(mapConfig)
+ val migrate = new KafkaCheckpointMigration
+ val oldCheckpointManager = getKafkaCheckpointManager
+
+ // Write a couple of checkpoints in the old checkpoint topic
+ val task1 = new TaskName(partition.toString)
+ val task2 = new TaskName(partition2.toString)
+
+ val changelogMapping = Map()
+
+ // Initialize coordinator stream
+ val coordinatorFactory = new CoordinatorStreamSystemFactory()
+ val coordinatorSystemConsumer = coordinatorFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
+ val coordinatorSystemProducer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
+ coordinatorSystemConsumer.register()
+ coordinatorSystemConsumer.start()
+
+ assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size, 0)
+ coordinatorSystemConsumer.stop
+
+ // Start the migration
+ def getManager() = getKafkaCheckpointManager
+ migrate.migrate(config, getManager)
+ // Ensure migration step does not create the non-existing checkpoint topic
+ assertFalse(getManager.topicExists)
+
+ // Verify if the checkpoints have been migrated
+ val newCheckpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, "test")
+ newCheckpointManager.register(task1)
+ newCheckpointManager.register(task2)
+ newCheckpointManager.start()
+ assertNull(newCheckpointManager.readLastCheckpoint(task1))
+ assertNull(newCheckpointManager.readLastCheckpoint(task2))
+ newCheckpointManager.stop()
+
+ // Verify if the changelogPartitionInfo has been migrated
+ val newChangelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, "test")
+ newChangelogManager.start
+ val newChangelogMapping = newChangelogManager.readChangeLogPartitionMapping()
+ newChangelogManager.stop
+ assertEquals(newChangelogMapping.toMap, changelogMapping)
+
+ // Check for migration message
+ coordinatorSystemConsumer.register()
+ coordinatorSystemConsumer.start()
+ assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size, 1)
+ coordinatorSystemConsumer.stop()
+ }
+ finally {
+ MockCoordinatorStreamSystemFactory.disableMockConsumerCache()
+ }
+ }
+
+ @Test
+ def testMigration() {
+ try {
+ val map = new java.util.HashMap[String, String]()
+ val mapConfig = Map(
+ "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
+ JobConfig.JOB_NAME -> "test",
+ JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
+ JobConfig.JOB_CONTAINER_COUNT -> "2",
+ "task.inputs" -> "test.stream1",
+ "task.checkpoint.system" -> "test",
+ SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
+ SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName)
+ // Enable consumer caching
+ MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
+
+ val config = new MapConfig(mapConfig)
+ val migrate = new KafkaCheckpointMigration
+ val oldCheckpointManager = getKafkaCheckpointManager
+
+ createCheckpointTopic()
+ oldCheckpointManager.validateTopic
+
+ // Write a couple of checkpoints in the old checkpoint topic
+ val task1 = new TaskName(partition.toString)
+ val task2 = new TaskName(partition2.toString)
+ writeCheckpoint(task1, cp1)
+ writeCheckpoint(task2, cp2)
+
+ val changelogMapping = Map(task1 -> 1.asInstanceOf[Integer], task2 -> 10.asInstanceOf[Integer])
+ // Write changelog partition info to the old checkpoint topic
+ writeChangeLogPartitionMapping(changelogMapping)
+ oldCheckpointManager.stop
+
+ // Initialize coordinator stream
+ val coordinatorFactory = new CoordinatorStreamSystemFactory()
+ val coordinatorSystemConsumer = coordinatorFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
+ val coordinatorSystemProducer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
+ coordinatorSystemConsumer.register()
+ coordinatorSystemConsumer.start()
+
+ assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size, 0)
+ coordinatorSystemConsumer.stop
+
+ // Start the migration
+ def getManager() = getKafkaCheckpointManager
+ migrate.migrate(config, getManager)
+
+ // Verify if the checkpoints have been migrated
+ val newCheckpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, "test")
+ newCheckpointManager.register(task1)
+ newCheckpointManager.register(task2)
+ newCheckpointManager.start()
+ assertEquals(cp1, newCheckpointManager.readLastCheckpoint(task1))
+ assertEquals(cp2, newCheckpointManager.readLastCheckpoint(task2))
+ newCheckpointManager.stop()
+
+ // Verify if the changelogPartitionInfo has been migrated
+ val newChangelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, "test")
+ newChangelogManager.start
+ val newChangelogMapping = newChangelogManager.readChangeLogPartitionMapping()
+ newChangelogManager.stop
+ assertEquals(newChangelogMapping.toMap, changelogMapping)
+
+ // Check for migration message
+ coordinatorSystemConsumer.register()
+ coordinatorSystemConsumer.start()
+ assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size, 1)
+ coordinatorSystemConsumer.stop()
+ }
+ finally {
+ MockCoordinatorStreamSystemFactory.disableMockConsumerCache()
+ }
+ }
+
+ @Test
+ def testCheckpointShouldBeNullIfCheckpointTopicDoesNotExistShouldBeCreatedOnWriteAndShouldBeReadableAfterWrite {
+ val kcm = getKafkaCheckpointManager
+ val taskName = new TaskName(partition.toString)
+ kcm.register(taskName)
+ createCheckpointTopic()
+ kcm.validateTopic
+ // check that log compaction is enabled.
+ val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+ val topicConfig = AdminUtils.fetchTopicConfig(zkClient, checkpointTopic)
+ zkClient.close
+ assertEquals("compact", topicConfig.get("cleanup.policy"))
+ assertEquals("26214400", topicConfig.get("segment.bytes"))
+ // read before topic exists should result in a null checkpoint
+ var readCp = kcm.readLastCheckpoint(taskName)
+ assertNull(readCp)
+ // create topic the first time around
+ writeCheckpoint(taskName, cp1)
+ readCp = kcm.readLastCheckpoint(taskName)
+ assertEquals(cp1, readCp)
+ // should get an exception if partition doesn't exist
+ try {
+ readCp = kcm.readLastCheckpoint(new TaskName(new Partition(1).toString))
+ fail("Expected a SamzaException, since only one partition (partition 0) should exist.")
+ } catch {
+ case e: SamzaException => None // expected
+ case _: Exception => fail("Expected a SamzaException, since only one partition (partition 0) should exist.")
+ }
+ // writing a second message should work, too
+ writeCheckpoint(taskName, cp2)
+ readCp = kcm.readLastCheckpoint(taskName)
+ assertEquals(cp2, readCp)
+ kcm.stop
+ }
+
+ @Test
+ def testUnrecoverableKafkaErrorShouldThrowKafkaCheckpointManagerException {
+ val exceptions = List("InvalidMessageException", "InvalidMessageSizeException", "UnknownTopicOrPartitionException")
+ exceptions.foreach { exceptionName =>
+ val kcm = getKafkaCheckpointManagerWithInvalidSerde(exceptionName)
+ val taskName = new TaskName(partition.toString)
+ kcm.register(taskName)
+ createCheckpointTopic(serdeCheckpointTopic)
+ kcm.validateTopic
+ writeCheckpoint(taskName, cp1, serdeCheckpointTopic)
+ // because serde will throw unrecoverable errors, it should result a KafkaCheckpointException
+ try {
+ kcm.readLastCheckpoint(taskName)
+ fail("Expected a KafkaCheckpointException.")
+ } catch {
+ case e: KafkaCheckpointException => None
+ }
+ kcm.stop
+ }
+ }
+
+ private def getKafkaCheckpointManager = new KafkaCheckpointManager(
+ clientId = "some-client-id",
+ checkpointTopic = checkpointTopic,
+ systemName = "kafka",
+ socketTimeout = 30000,
+ bufferSize = 64 * 1024,
+ fetchSize = 300 * 1024,
+ metadataStore = metadataStore,
+ connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties),
+ connectZk = () => new ZkClient(zkConnect, 60000, 60000, ZKStringSerializer),
+ systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
+ checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]())))
+
+ // inject serde. Kafka exceptions will be thrown when serde.fromBytes is called
+ private def getKafkaCheckpointManagerWithInvalidSerde(exception: String) = new KafkaCheckpointManager(
+ clientId = "some-client-id-invalid-serde",
+ checkpointTopic = serdeCheckpointTopic,
+ systemName = "kafka",
+ socketTimeout = 30000,
+ bufferSize = 64 * 1024,
+ fetchSize = 300 * 1024,
+ metadataStore = metadataStore,
+ connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties),
+ connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
+ systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
+ serde = new InvalideSerde(exception),
+ checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]())))
+
+ class InvalideSerde(exception: String) extends CheckpointSerde {
+ override def fromBytes(bytes: Array[Byte]): Checkpoint = {
+ exception match {
+ case "InvalidMessageException" => throw new InvalidMessageException
+ case "InvalidMessageSizeException" => throw new InvalidMessageSizeException
+ case "UnknownTopicOrPartitionException" => throw new UnknownTopicOrPartitionException
+ }
+ }
+ }
+}