You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jg...@apache.org on 2014/04/08 03:40:26 UTC
git commit: SAMZA-221: TestKafkaCheckpointManager is in the wrong
package.
Repository: incubator-samza
Updated Branches:
refs/heads/master 7b4204572 -> b3b485cce
SAMZA-221: TestKafkaCheckpointManager is in the wrong package.
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/b3b485cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/b3b485cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/b3b485cc
Branch: refs/heads/master
Commit: b3b485cce56dec3522d86f708c1a55e92d8e1a82
Parents: 7b42045
Author: Yan Fang <yanfang724 at gmail dot com>
Authored: Mon Apr 7 18:40:01 2014 -0700
Committer: Jakob Homan <jg...@apache.org>
Committed: Mon Apr 7 18:40:01 2014 -0700
----------------------------------------------------------------------
.../checkpoint/TestKafkaCheckpointManager.scala | 144 -------------------
.../kafka/TestKafkaCheckpointManager.scala | 144 +++++++++++++++++++
2 files changed, 144 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b3b485cc/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala
deleted file mode 100644
index f1a8f8a..0000000
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/TestKafkaCheckpointManager.scala
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint.kafka
-
-import org.I0Itec.zkclient.ZkClient
-import org.junit.Assert._
-import org.junit.AfterClass
-import org.junit.BeforeClass
-import org.junit.Test
-import kafka.producer.Producer
-import kafka.producer.ProducerConfig
-import kafka.server.KafkaConfig
-import kafka.server.KafkaServer
-import kafka.utils.TestUtils
-import kafka.utils.TestZKUtils
-import kafka.utils.Utils
-import kafka.zk.EmbeddedZookeeper
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.Partition
-import scala.collection._
-import scala.collection.JavaConversions._
-import org.apache.samza.util.{ ClientUtilTopicMetadataStore, TopicMetadataStore }
-import org.apache.samza.config.MapConfig
-import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.system.SystemStream
-import kafka.utils.ZKStringSerializer
-
-object TestKafkaCheckpointManager {
- val zkConnect: String = TestZKUtils.zookeeperConnect
- var zkClient: ZkClient = null
- val zkConnectionTimeout = 6000
- val zkSessionTimeout = 6000
-
- val brokerId1 = 0
- val brokerId2 = 1
- val brokerId3 = 2
- val ports = TestUtils.choosePorts(3)
- val (port1, port2, port3) = (ports(0), ports(1), ports(2))
-
- val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
- props1.put("controlled.shutdown.enable", "true")
- val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
- props1.put("controlled.shutdown.enable", "true")
- val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
- props1.put("controlled.shutdown.enable", "true")
-
- val config = new java.util.Properties()
- val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
- config.put("metadata.broker.list", brokers)
- config.put("producer.type", "sync")
- config.put("request.required.acks", "-1")
- val producerConfig = new ProducerConfig(config)
- val partition = new Partition(0)
- val cp1 = new Checkpoint(Map(new SystemStream("kafka", "topic") -> "123"))
- val cp2 = new Checkpoint(Map(new SystemStream("kafka", "topic") -> "12345"))
- var zookeeper: EmbeddedZookeeper = null
- var server1: KafkaServer = null
- var server2: KafkaServer = null
- var server3: KafkaServer = null
- var metadataStore: TopicMetadataStore = null
-
- @BeforeClass
- def beforeSetupServers {
- zookeeper = new EmbeddedZookeeper(zkConnect)
- server1 = TestUtils.createServer(new KafkaConfig(props1))
- server2 = TestUtils.createServer(new KafkaConfig(props2))
- server3 = TestUtils.createServer(new KafkaConfig(props3))
- metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
- }
-
- @AfterClass
- def afterCleanLogDirs {
- server1.shutdown
- server1.awaitShutdown()
- server2.shutdown
- server2.awaitShutdown()
- server3.shutdown
- server3.awaitShutdown()
- Utils.rm(server1.config.logDirs)
- Utils.rm(server2.config.logDirs)
- Utils.rm(server3.config.logDirs)
- zookeeper.shutdown
- }
-}
-
-class TestKafkaCheckpointManager {
- import TestKafkaCheckpointManager._
-
- @Test
- def testCheckpointShouldBeNullIfcheckpointTopicDoesNotExistShouldBeCreatedOnWriteAndShouldBeReadableAfterWrite {
- val kcm = getKafkaCheckpointManager
- kcm.register(partition)
- kcm.start
- var readCp = kcm.readLastCheckpoint(partition)
- // read before topic exists should result in a null checkpoint
- assert(readCp == null)
- // create topic the first time around
- kcm.writeCheckpoint(partition, cp1)
- readCp = kcm.readLastCheckpoint(partition)
- assert(cp1.equals(readCp))
- // should get an exception if partition doesn't exist
- try {
- readCp = kcm.readLastCheckpoint(new Partition(1))
- fail("Expected a SamzaException, since only one partition (partition 0) should exist.")
- } catch {
- case e: Exception => None // expected
- }
- // writing a second message should work, too
- kcm.writeCheckpoint(partition, cp2)
- readCp = kcm.readLastCheckpoint(partition)
- assert(cp2.equals(readCp))
- kcm.stop
- }
-
- private def getKafkaCheckpointManager = new KafkaCheckpointManager(
- clientId = "some-client-id",
- checkpointTopic = "checkpoint-topic",
- systemName = "kafka",
- totalPartitions = 1,
- replicationFactor = 3,
- socketTimeout = 30000,
- bufferSize = 64 * 1024,
- fetchSize = 300 * 1024,
- metadataStore = metadataStore,
- connectProducer = () => new Producer[Partition, Array[Byte]](producerConfig),
- connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer))
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/b3b485cc/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
new file mode 100644
index 0000000..f1a8f8a
--- /dev/null
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka
+
+import org.I0Itec.zkclient.ZkClient
+import org.junit.Assert._
+import org.junit.AfterClass
+import org.junit.BeforeClass
+import org.junit.Test
+import kafka.producer.Producer
+import kafka.producer.ProducerConfig
+import kafka.server.KafkaConfig
+import kafka.server.KafkaServer
+import kafka.utils.TestUtils
+import kafka.utils.TestZKUtils
+import kafka.utils.Utils
+import kafka.zk.EmbeddedZookeeper
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.Partition
+import scala.collection._
+import scala.collection.JavaConversions._
+import org.apache.samza.util.{ ClientUtilTopicMetadataStore, TopicMetadataStore }
+import org.apache.samza.config.MapConfig
+import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.system.SystemStream
+import kafka.utils.ZKStringSerializer
+
+object TestKafkaCheckpointManager {
+ val zkConnect: String = TestZKUtils.zookeeperConnect
+ var zkClient: ZkClient = null
+ val zkConnectionTimeout = 6000
+ val zkSessionTimeout = 6000
+
+ val brokerId1 = 0
+ val brokerId2 = 1
+ val brokerId3 = 2
+ val ports = TestUtils.choosePorts(3)
+ val (port1, port2, port3) = (ports(0), ports(1), ports(2))
+
+ val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
+ props1.put("controlled.shutdown.enable", "true")
+ val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
+ props1.put("controlled.shutdown.enable", "true")
+ val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
+ props1.put("controlled.shutdown.enable", "true")
+
+ val config = new java.util.Properties()
+ val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
+ config.put("metadata.broker.list", brokers)
+ config.put("producer.type", "sync")
+ config.put("request.required.acks", "-1")
+ val producerConfig = new ProducerConfig(config)
+ val partition = new Partition(0)
+ val cp1 = new Checkpoint(Map(new SystemStream("kafka", "topic") -> "123"))
+ val cp2 = new Checkpoint(Map(new SystemStream("kafka", "topic") -> "12345"))
+ var zookeeper: EmbeddedZookeeper = null
+ var server1: KafkaServer = null
+ var server2: KafkaServer = null
+ var server3: KafkaServer = null
+ var metadataStore: TopicMetadataStore = null
+
+ @BeforeClass
+ def beforeSetupServers {
+ zookeeper = new EmbeddedZookeeper(zkConnect)
+ server1 = TestUtils.createServer(new KafkaConfig(props1))
+ server2 = TestUtils.createServer(new KafkaConfig(props2))
+ server3 = TestUtils.createServer(new KafkaConfig(props3))
+ metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
+ }
+
+ @AfterClass
+ def afterCleanLogDirs {
+ server1.shutdown
+ server1.awaitShutdown()
+ server2.shutdown
+ server2.awaitShutdown()
+ server3.shutdown
+ server3.awaitShutdown()
+ Utils.rm(server1.config.logDirs)
+ Utils.rm(server2.config.logDirs)
+ Utils.rm(server3.config.logDirs)
+ zookeeper.shutdown
+ }
+}
+
+class TestKafkaCheckpointManager {
+ import TestKafkaCheckpointManager._
+
+ @Test
+ def testCheckpointShouldBeNullIfcheckpointTopicDoesNotExistShouldBeCreatedOnWriteAndShouldBeReadableAfterWrite {
+ val kcm = getKafkaCheckpointManager
+ kcm.register(partition)
+ kcm.start
+ var readCp = kcm.readLastCheckpoint(partition)
+ // read before topic exists should result in a null checkpoint
+ assert(readCp == null)
+ // create topic the first time around
+ kcm.writeCheckpoint(partition, cp1)
+ readCp = kcm.readLastCheckpoint(partition)
+ assert(cp1.equals(readCp))
+ // should get an exception if partition doesn't exist
+ try {
+ readCp = kcm.readLastCheckpoint(new Partition(1))
+ fail("Expected a SamzaException, since only one partition (partition 0) should exist.")
+ } catch {
+ case e: Exception => None // expected
+ }
+ // writing a second message should work, too
+ kcm.writeCheckpoint(partition, cp2)
+ readCp = kcm.readLastCheckpoint(partition)
+ assert(cp2.equals(readCp))
+ kcm.stop
+ }
+
+ private def getKafkaCheckpointManager = new KafkaCheckpointManager(
+ clientId = "some-client-id",
+ checkpointTopic = "checkpoint-topic",
+ systemName = "kafka",
+ totalPartitions = 1,
+ replicationFactor = 3,
+ socketTimeout = 30000,
+ bufferSize = 64 * 1024,
+ fetchSize = 300 * 1024,
+ metadataStore = metadataStore,
+ connectProducer = () => new Producer[Partition, Array[Byte]](producerConfig),
+ connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer))
+}