You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/02/26 23:47:39 UTC
[1/2] samza git commit: SAMZA-394; fix legacy integration tests
Repository: samza
Updated Branches:
refs/heads/master a984a6a2c -> 04f7ff7b4
SAMZA-394; fix legacy integration tests
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/87c4ab65
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/87c4ab65
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/87c4ab65
Branch: refs/heads/master
Commit: 87c4ab658986ac13e9a84b1dbde740c1cbebac73
Parents: a984a6a
Author: Navina Ramesh <na...@gmail.com>
Authored: Thu Feb 26 14:41:17 2015 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Thu Feb 26 14:41:17 2015 -0800
----------------------------------------------------------------------
bin/setup-int-test.sh | 49 ++++++
build.gradle | 2 +
.../apache/samza/container/SamzaContainer.scala | 1 +
.../scala/org/apache/samza/job/JobRunner.scala | 2 +-
.../samza/job/local/ProcessJobFactory.scala | 8 +-
.../src/main/config/hello-stateful-world.samsa | 31 ----
samza-test/src/main/config/join/README | 77 +++++++++
samza-test/src/main/config/join/checker.samsa | 36 ----
samza-test/src/main/config/join/checker.samza | 38 +++++
.../src/main/config/join/common.properties | 46 +++++
samza-test/src/main/config/join/emitter.samsa | 33 ----
samza-test/src/main/config/join/emitter.samza | 35 ++++
samza-test/src/main/config/join/joiner.samsa | 31 ----
samza-test/src/main/config/join/joiner.samza | 32 ++++
samza-test/src/main/config/join/reset.sh | 47 ++++++
samza-test/src/main/config/join/watcher.samsa | 31 ----
samza-test/src/main/config/join/watcher.samza | 28 ++++
.../src/main/config/negate-number.properties | 2 +
.../test/integration/SimpleStatefulTask.java | 5 +
.../test/integration/StatePerfTestTask.java | 3 +
.../samza/test/integration/join/Checker.java | 60 +++++--
.../samza/test/integration/join/Emitter.java | 33 ++--
.../test/integration/join/EpochPartitioner.java | 5 +-
.../samza/test/integration/join/Joiner.java | 24 ++-
.../samza/test/integration/join/Watcher.java | 42 +----
.../src/main/python/samza_failure_testing.py | 166 +++++++++++++++++++
.../main/resources/hello-stateful-world.samza | 31 ++++
samza-test/src/main/resources/log4j.xml | 11 +-
28 files changed, 671 insertions(+), 238 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/bin/setup-int-test.sh
----------------------------------------------------------------------
diff --git a/bin/setup-int-test.sh b/bin/setup-int-test.sh
new file mode 100755
index 0000000..112bda6
--- /dev/null
+++ b/bin/setup-int-test.sh
@@ -0,0 +1,49 @@
+#!/bin/bash -e
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This script will setup the environment for integration test and kick-start the samza jobs
+
+if [ $# -eq 0 ] || [ -z "$1" ] ; then
+ echo "Usage: ./bin/setup-int-test.sh <DEPLOY_DIR>"
+ exit -1
+fi
+
+DEPLOY_ROOT_DIR=$1
+KAFKA_DIR=$1/kafka
+SAMZA_DIR=$1/samza
+
+# Setup the deployment Grid
+# $BASE_DIR/bin/grid.sh bootstrap
+
+# sleep 10
+
+# Setup the topics
+$KAFKA_DIR/bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --create --topic epoch
+$KAFKA_DIR/bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --create --topic emitter-state
+$KAFKA_DIR/bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --create --topic emitted
+$KAFKA_DIR/bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --create --topic joiner-state
+$KAFKA_DIR/bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --create --topic completed-keys
+$KAFKA_DIR/bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --create --topic checker-state
+
+# Start the jobs
+for job in checker joiner emitter watcher
+do
+ $SAMZA_DIR/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$SAMZA_DIR/config/join/common.properties --config-path=file://$SAMZA_DIR/config/join/$job.samza --config job.foo=$job
+done
+
+
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 9ca6cdc..97d848a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -80,6 +80,7 @@ rat {
'samza-test/state/mystore/**',
'README.md',
'RELEASE.md',
+ 'samza-test/src/main/resources/**'
]
}
@@ -382,6 +383,7 @@ project(":samza-test_$scalaVersion") {
from(file("$projectDir/src/main/config")) { into "config/" }
from(file("$projectDir/src/main/resources")) { into "lib/" }
from(project(':samza-shell').file("src/main/bash")) { into "bin/" }
+ from(file("$projectDir/src/main/python/ghostface_killah.py")) { into "bin/"}
from(project(':samza-shell').file("src/main/resources")) { into "lib/" }
from(project(':samza-shell').file("src/main/resources/log4j-console.xml")) { into "bin/" }
from '../LICENSE'
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 2fc6c65..e3b9d30 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -79,6 +79,7 @@ object SamzaContainer extends Logging {
// exception in the main method.
val containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID).toInt
val coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL)
+ logger.info("######### Coordinator URL in SafeMain() - " + coordinatorUrl)
val jobModel = readJobModel(coordinatorUrl)
val containerModel = jobModel.getContainers()(containerId.toInt)
val config = jobModel.getConfig
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 16345cd..0b720ec 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -67,7 +67,7 @@ class JobRunner(config: Config) extends Logging with Runnable {
Option(job.waitForStatus(Running, 500)) match {
case Some(appStatus) => {
if (Running.equals(appStatus)) {
- info("job started successfully")
+ info("job started successfully - " + appStatus)
} else {
warn("unable to start job successfully. job has status %s" format (appStatus))
}
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index 6985af6..b80d349 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -35,7 +35,7 @@ import org.apache.samza.coordinator.JobCoordinator
* Creates a stand alone ProcessJob with the specified config.
*/
class ProcessJobFactory extends StreamJobFactory with Logging {
- def getJob(config: Config): StreamJob = {
+ def getJob(config: Config): StreamJob = {
val coordinator = JobCoordinator(config, 1)
val containerModel = coordinator.jobModel.getContainers.get(0)
@@ -53,16 +53,16 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
}
}
}
+ coordinator.start
commandBuilder
.setConfig(config)
.setId(0)
-
- coordinator.start
+ .setUrl(coordinator.server.getUrl)
new ProcessJob(commandBuilder)
} finally {
- coordinator.stop
+// coordinator.stop
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/config/hello-stateful-world.samsa
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/hello-stateful-world.samsa b/samza-test/src/main/config/hello-stateful-world.samsa
deleted file mode 100644
index 745f881..0000000
--- a/samza-test/src/main/config/hello-stateful-world.samsa
+++ /dev/null
@@ -1,31 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Job
-job.factory.class=samza.job.local.ThreadJobFactory
-job.name=hello-stateful-world
-
-# Task
-task.class=samza.test.integration.SimpleStatefulTask
-task.inputs=kafka.input
-
-# Stores
-stores.mystore.factory=samza.storage.kv.KeyValueStorageEngineFactory
-stores.mystore.key.serde=string
-stores.mystore.msg.serde=string
-stores.mystore.changelog=kafka.mystore
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/config/join/README
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/join/README b/samza-test/src/main/config/join/README
new file mode 100644
index 0000000..27bda8b
--- /dev/null
+++ b/samza-test/src/main/config/join/README
@@ -0,0 +1,77 @@
+INTEGRATION TEST
+
+* What does the test do? *
+This is a correctness test that attempts to do partitioned messaging and use state. It is meant to be run while killing samsa and kafka machines to test fault-tolerance.
+It runs in iterations and each iteration has a correctness criteria that is checked before launching the next iteration. Here are the jobs and their function
+
+emitter.samza:
+ This job takes input from the "epoch" topic. Epochs are number 0, 1, 2,...
+ For each new epoch each emitter task does something like the following:
+ for i = 0...count:
+ send("emitted", i, partition)
+ where partition is the task partition id.
+
+joiner.samza:
+ This job takes in the emitted values from emitter and joins them together by key.
+ When it has received an emitted value from each partition it outputs the key to the topic "completed".
+ To track which partitions have emitted their value it keeps a store with | seperated numbers.
+ The first entry is the epoch and the remaining entries are partitions that have emitted the key.
+
+checker.samza:
+ This job has a single partition and stores all the completed keys. When all the keys are completed it sends an incremented epoch to the epoch topic, kicking off a new round.
+
+watcher.samza:
+ This job watches the epoch topic. If the epoch doesn't advance within some SLA this job sends an alert email.
+
+The state maintained by some of these jobs is slightly complex because of the need to make everything idempotent. So, for example, instead of keeping the partition count
+in the joiner job we keep the set of partitions so that double counting can not occur.
+
+To run, simply start all four jobs at once.
+
+* How to setup test ? *
+
+NOTE: You will need to have Paramiko installed in order to run these tests.
+
+This test is meant to be used with hello-samza's bin/grid script. If you use hello-samza, and bin/grid bootstrap, then DEPLOY_DIR can be set to samza-hello-samza/deploy/samza,
+and you can run the integration tests with ./bin/setup-int-test.sh path/to/samza-hello-samza/deploy.
+
+The steps to setup the integration tests is similar to the samza-hello-samza setup, using the grid script.
+Once you deploy Zookeeper, YARN and Kafka, you have to generate the tar ball with the tests.
+Before generating the tar ball, update the "yarn.package.path" in $SAMZA_SRC/src/config/join/common.properties file to the path where the published tar will be made available.
+
+--> Release the tar
+cd $SAMZA_SRC
+./gradlew releaseTestJobs
+cp ~/samza-test/build/distributions/samza-test_*.tgz $DEPLOY_DIR
+tar -xvf $DEPLOY_DIR/samza-test_*.tgz -C $DEPLOY_DIR/samza
+
+--> Create the topics and start the samza jobs
+./bin/setup-int-test.sh $DEPLOY_DIR
+
+Now, you should be able to view all 4 jobs running in the Yarn UI.
+
+FAILURE TESTING:
+
+* What does the test do? *
+This is used to test the resilience of the system. It periodically brings down a random container or kafka broker in the system and waits to see if it recovers correctly.
+
+* How to setup test? *
+Ensure that the 4 jobs are running via the YARN UI.
+In order to trigger the failure testing, run the python script: $SAMZA_SRC/samza-test/src/main/python/samza_failure_testing.py
+
+Usage: samza_failure_testing.py [options]
+
+Options:
+ -h, --help show this help message and exit
+ --node-list=nodes.txt
+ A list of nodes in the YARN cluster
+ --kill-time=s The time in seconds to sleep between
+ --kafka-dir=dir The directory in which to find kafka
+ --kafka-host=localhost
+ Host on which Kafka is installed
+ --yarn-dir=dir The directory in which to find yarn
+ --kill-kafka Should we kill Kafka?
+ --kill-container Should we kill Application Container?
+ --yarn-host=localhost
+ Host that will respond to Yarn REST queries
+
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/config/join/checker.samsa
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/join/checker.samsa b/samza-test/src/main/config/join/checker.samsa
deleted file mode 100644
index 6a6b9cd..0000000
--- a/samza-test/src/main/config/join/checker.samsa
+++ /dev/null
@@ -1,36 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Job
-job.name=checker
-
-systems.kafka.partitioner.class=samza.test.integration.join.EpochPartitioner
-
-# Task
-task.class=samza.test.integration.join.Checker
-task.inputs=kafka.completed-keys
-
-stores.checker-state.factory=samza.storage.kv.KeyValueStorageEngineFactory
-stores.checker-state.key.serde=string
-stores.checker-state.msg.serde=string
-stores.checker-state.changelog=kafka.checker-state
-
-task.window.ms=300000
-
-num.partitions=4
-expected.keys=100000
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/config/join/checker.samza
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/join/checker.samza b/samza-test/src/main/config/join/checker.samza
new file mode 100644
index 0000000..ab557d9
--- /dev/null
+++ b/samza-test/src/main/config/join/checker.samza
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.name=checker
+
+systems.kafka.partitioner.class=org.apache.samza.test.integration.join.EpochPartitioner
+
+# Task
+task.class=org.apache.samza.test.integration.join.Checker
+task.inputs=kafka.completed-keys
+
+stores.checker-state.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+stores.checker-state.key.serde=string
+stores.checker-state.msg.serde=string
+stores.checker-state.changelog=kafka.checker-state
+stores.checker-state.changelog.replication.factor=1
+
+task.window.ms=30000
+
+num.partitions=2
+expected.keys=5000
+
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/config/join/common.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/join/common.properties b/samza-test/src/main/config/join/common.properties
new file mode 100644
index 0000000..ad10aac
--- /dev/null
+++ b/samza-test/src/main/config/join/common.properties
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+####################
+### UPDATE THIS! ###
+####################
+yarn.package.path=<YARN.PACKAGE.PATH>
+
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
+task.checkpoint.system=kafka-checkpoints
+task.checkpoint.replication.factor=1
+
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+ # Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.consumer.auto.offset.reset=smallest
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.samza.key.serde=string
+systems.kafka.samza.msg.serde=string
+
+ # Checkpoints System
+systems.kafka-checkpoints.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka-checkpoints.producer.bootstrap.servers=localhost:9092
+systems.kafka-checkpoints.consumer.zookeeper.connect=localhost:2181
+
+yarn.container.retry.count=-1
+yarn.container.retry.window.ms=60000
+
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/config/join/emitter.samsa
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/join/emitter.samsa b/samza-test/src/main/config/join/emitter.samsa
deleted file mode 100644
index 5e94322..0000000
--- a/samza-test/src/main/config/join/emitter.samsa
+++ /dev/null
@@ -1,33 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Job
-job.name=emitter
-
-# Task
-task.class=samza.test.integration.join.Emitter
-task.inputs=kafka.epoch
-
-stores.emitter-state.factory=samza.storage.kv.KeyValueStorageEngineFactory
-stores.emitter-state.key.serde=string
-stores.emitter-state.msg.serde=string
-stores.emitter-state.changelog=kafka.emitter-state
-
-task.window.ms=0
-
-count=100000
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/config/join/emitter.samza
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/join/emitter.samza b/samza-test/src/main/config/join/emitter.samza
new file mode 100644
index 0000000..50379a3
--- /dev/null
+++ b/samza-test/src/main/config/join/emitter.samza
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.name=emitter
+
+# Task
+task.class=org.apache.samza.test.integration.join.Emitter
+task.inputs=kafka.epoch
+task.commit.ms=1000
+task.window.ms=5
+
+
+stores.emitter-state.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+stores.emitter-state.key.serde=string
+stores.emitter-state.msg.serde=string
+stores.emitter-state.changelog=kafka.emitter-state
+stores.emitter-state.changelog.replication.factor=1
+
+count=5000
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/config/join/joiner.samsa
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/join/joiner.samsa b/samza-test/src/main/config/join/joiner.samsa
deleted file mode 100644
index 4ecee2b..0000000
--- a/samza-test/src/main/config/join/joiner.samsa
+++ /dev/null
@@ -1,31 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Job
-job.name=joiner
-
-# Task
-task.class=samza.test.integration.join.Joiner
-task.inputs=kafka.emitted
-
-stores.joiner-state.factory=samza.storage.kv.KeyValueStorageEngineFactory
-stores.joiner-state.key.serde=string
-stores.joiner-state.msg.serde=string
-stores.joiner-state.changelog=kafka.checker-state
-
-num.partitions=4
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/config/join/joiner.samza
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/join/joiner.samza b/samza-test/src/main/config/join/joiner.samza
new file mode 100644
index 0000000..a138e9e
--- /dev/null
+++ b/samza-test/src/main/config/join/joiner.samza
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.name=joiner
+
+# Task
+task.class=org.apache.samza.test.integration.join.Joiner
+task.inputs=kafka.emitted
+
+stores.joiner-state.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+stores.joiner-state.key.serde=string
+stores.joiner-state.msg.serde=string
+stores.joiner-state.changelog=kafka.joiner-state
+stores.joiner-state.changelog.replication.factor=1
+
+num.partitions=2
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/config/join/reset.sh
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/join/reset.sh b/samza-test/src/main/config/join/reset.sh
new file mode 100644
index 0000000..f94bded
--- /dev/null
+++ b/samza-test/src/main/config/join/reset.sh
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#!/bin/bash
+
+###############################################################################################
+## setup script for join test--delete all kafka and zk data and restart them in a fresh state
+###############################################################################################
+
+echo "Shutting down and cleaning up..."
+pkill -f kafka.Kafka
+pkill -f org.apache.zookeeper.server.quorum.QuorumPeerMain
+sleep 5
+rm -rf /tmp/kafka-logs /tmp/zookeeper
+sleep 2
+
+echo "Starting zk and kafka..."
+bin/zookeeper-server-start.sh config/zookeeper.properties &
+sleep 5
+bin/kafka-server-start.sh config/server.properties &
+sleep 5
+
+echo "Creating topics..."
+bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --create --topic epoch
+bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --create --topic emitter-state
+bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --create --topic emitted
+bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --create --topic joiner-state
+bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --create --topic completed-keys
+bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --create --topic checker-state
+
+echo "all done"
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/config/join/watcher.samsa
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/join/watcher.samsa b/samza-test/src/main/config/join/watcher.samsa
deleted file mode 100644
index ecaa502..0000000
--- a/samza-test/src/main/config/join/watcher.samsa
+++ /dev/null
@@ -1,31 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Job
-job.name=watcher
-
-# Task
-task.class=samza.test.integration.join.Joiner
-task.inputs=kafka.epoch
-
-task.window.ms=300000
-
-max.time.between.epochs.ms=600000
-mail.smtp.host=TODO
-mail.to=dev@samza.apache.org
-mail.from=gregor@incubator.apache.org
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/config/join/watcher.samza
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/join/watcher.samza b/samza-test/src/main/config/join/watcher.samza
new file mode 100644
index 0000000..3e2bf7e
--- /dev/null
+++ b/samza-test/src/main/config/join/watcher.samza
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.name=watcher
+
+# Task
+task.class=org.apache.samza.test.integration.join.Watcher
+task.inputs=kafka.epoch
+
+task.window.ms=40000
+
+max.time.between.epochs.ms=60000
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/config/negate-number.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/negate-number.properties b/samza-test/src/main/config/negate-number.properties
index b9f898c..2d2f75a 100644
--- a/samza-test/src/main/config/negate-number.properties
+++ b/samza-test/src/main/config/negate-number.properties
@@ -27,6 +27,8 @@ task.class=org.apache.samza.test.integration.NegateNumberTask
task.inputs=kafka.samza-test-topic
task.max.messages=50
task.outputs=kafka.samza-test-topic-output
+task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
+task.checkpoint.replication.factor=1
# Serializers
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
index 4dbcb75..67e56e0 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
@@ -31,6 +31,11 @@ import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.task.TaskCoordinator.RequestScope;
+/**
+ * This is a simple task that writes each message to a state store and prints them all out on reload.
+ *
+ * It is useful for command line testing with the kafka console producer and consumer and text messages.
+ */
public class SimpleStatefulTask implements StreamTask, InitableTask {
private KeyValueStore<String, String> store;
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
index 77f770e..d7fecd8 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
@@ -29,6 +29,9 @@ import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.task.TaskCoordinator.RequestScope;
+/**
+ * A simple performance test that just reads in messages and writes them to a state store as quickly as possible and periodically prints out throughput numbers
+ */
public class StatePerfTestTask implements StreamTask, InitableTask {
private KeyValueStore<String, String> store;
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
index 2a2177a..1fef1ea 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
@@ -20,6 +20,7 @@
package org.apache.samza.test.integration.join;
import org.apache.samza.config.Config;
+import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.IncomingMessageEnvelope;
@@ -31,9 +32,17 @@ import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.task.WindowableTask;
-import static java.lang.System.out;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
public class Checker implements StreamTask, WindowableTask, InitableTask {
+
+ private static Logger logger = LoggerFactory.getLogger(Checker.class);
private static String CURRENT_EPOCH = "current-epoch";
private KeyValueStore<String, String> store;
@@ -52,40 +61,63 @@ public class Checker implements StreamTask, WindowableTask, InitableTask {
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
String key = (String) envelope.getKey();
String epoch = (String) envelope.getMessage();
+ logger.info("Got key=" + key + ", epoch = " + epoch + " in checker...");
checkEpoch(epoch);
this.store.put(key, epoch);
}
@Override
public void window(MessageCollector collector, TaskCoordinator coordinator) {
- KeyValueIterator<String, String> iter = this.store.all();
String currentEpoch = this.store.get(CURRENT_EPOCH);
- out.println("Checking if epoch " + currentEpoch + " is complete.");
+ logger.info("Checking if epoch " + currentEpoch + " is complete.");
int count = 0;
+ KeyValueIterator<String, String> iter = this.store.all();
+
while(iter.hasNext()) {
- String foundEpoch = iter.next().getValue();
- if(foundEpoch.equals(currentEpoch))
- count += 1;
+ Entry<String, String> entry= iter.next();
+ String foundEpoch = entry.getValue();
+ if(foundEpoch.equals(currentEpoch)) {
+ count += 1;
+ } else {
+ logger.info("####### Found a different epoch! - " + foundEpoch + " Current epoch is " + currentEpoch);
+ }
}
iter.close();
if(count == expectedKeys + 1) {
- out.println("Epoch " + currentEpoch + " is complete.");
+ logger.info("Epoch " + currentEpoch + " is complete.");
int nextEpoch = Integer.parseInt(currentEpoch) + 1;
- for(int i = 0; i < numPartitions; i++)
- collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "epoch"), i, Integer.toString(nextEpoch)));
+ for(int i = 0; i < numPartitions; i++) {
+ logger.info("Emitting next epoch - " + Integer.toString(i) + " -> " + Integer.toString(nextEpoch));
+ collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "epoch"), Integer.toString(i), Integer.toString(nextEpoch)));
+ }
+ this.store.put(CURRENT_EPOCH, Integer.toString(nextEpoch));
} else if(count > expectedKeys + 1) {
throw new IllegalStateException("Got " + count + " keys, which is more than the expected " + (expectedKeys + 1));
} else {
- out.println("Only found " + count + " valid keys, try again later.");
+ logger.info("Only found " + count + " valid keys, try again later.");
}
}
- private void checkEpoch(String epoch) {
+/* private void checkEpoch(String epoch) {
String curr = this.store.get(CURRENT_EPOCH);
if(curr == null)
this.store.put(CURRENT_EPOCH, epoch);
- else if(!curr.equals(epoch))
+ else if(!curr.equals(epoch)) // should have curr > epoch
throw new IllegalArgumentException("Got epoch " + epoch + " but have not yet completed " + curr);
- }
-
+ }*/
+ private void checkEpoch(String epoch) {
+ String curr = this.store.get(CURRENT_EPOCH);
+ if(curr == null)
+ this.store.put(CURRENT_EPOCH, epoch);
+ else {
+ int currentEpochInStore = Integer.parseInt(curr);
+ int currentEpochInMsg = Integer.parseInt(epoch);
+ if (currentEpochInMsg <= currentEpochInStore) {
+ if(currentEpochInMsg < currentEpochInStore)
+ logger.info("#### Ignoring received epoch = " + epoch + " less than what is in store " + curr);
+ } else { // should have curr > epoch
+ throw new IllegalArgumentException("Got epoch " + epoch + " but have not yet completed " + curr);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
index f20bb7f..e958b51 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
@@ -32,15 +32,15 @@ import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.task.TaskCoordinator.RequestScope;
import org.apache.samza.task.WindowableTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
-/**
- * This job takes input from "epoch" and for each epoch emits "max" records of form
- * (key = counter, value = epoch-partition)
- *
- */
@SuppressWarnings("unchecked")
public class Emitter implements StreamTask, InitableTask, WindowableTask {
+ private static Logger logger = LoggerFactory.getLogger(Emitter.class);
+
private static String EPOCH = "the-epoch";
private static String COUNT = "the-count";
@@ -59,6 +59,8 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask {
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
if(envelope.getSystemStreamPartition().getStream().equals("epoch")) {
int newEpoch = Integer.parseInt((String) envelope.getMessage());
+ logger.info("New epoch in message - " + newEpoch);
+
Integer epoch = getInt(EPOCH);
if(epoch == null || newEpoch == epoch)
return;
@@ -66,7 +68,8 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask {
throw new IllegalArgumentException("Got new epoch " + newEpoch + " which is less than current epoch " + epoch);
// it's a new era, reset current epoch and count
- this.state.put(EPOCH, Integer.toString(epoch));
+ logger.info("Epoch: " + newEpoch);
+ this.state.put(EPOCH, Integer.toString(newEpoch));
this.state.put(COUNT, "0");
coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER);
}
@@ -80,15 +83,19 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask {
}
int counter = getInt(COUNT);
if(counter < max) {
- OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(new SystemStream("kafka", "emitted"), Integer.toString(counter), epoch + "-" + taskName);
+ logger.info("Emitting: " + counter + ", epoch = " + epoch + ", task = " + taskName);
+ OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(new SystemStream("kafka", "emitted"), Integer.toString(counter), epoch + "-" + taskName.toString());
collector.send(envelope);
this.state.put(COUNT, Integer.toString(getInt(COUNT) + 1));
- } else {
- trySleep(100);
}
+/* if(counter == max) {
+ logger.info("###### Committing because we finished emitting counter in this epoch");
+ coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER);
+ }*/
}
private void resetEpoch() {
+ logger.info("Resetting epoch to 0");
state.put(EPOCH, "0");
state.put(COUNT, "0");
}
@@ -97,13 +104,5 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask {
String value = this.state.get(key);
return value == null? null : Integer.parseInt(value);
}
-
- private void trySleep(long ms) {
- try {
- Thread.sleep(ms);
- } catch(Exception e) {
- e.printStackTrace();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/java/org/apache/samza/test/integration/join/EpochPartitioner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/EpochPartitioner.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/EpochPartitioner.java
index 670ccf9..438d77c 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/join/EpochPartitioner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/EpochPartitioner.java
@@ -27,6 +27,9 @@ public class EpochPartitioner implements Partitioner {
public EpochPartitioner(VerifiableProperties p){}
public int partition(Object key, int numParts) {
- return Integer.parseInt((String) key);
+ if(key instanceof Integer)
+ return (Integer) key;
+ else
+ return Integer.parseInt((String) key);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java
index 299c1fa..cb30838 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java
@@ -19,10 +19,12 @@
package org.apache.samza.test.integration.join;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -32,17 +34,24 @@ import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Int;
@SuppressWarnings("unchecked")
public class Joiner implements StreamTask, InitableTask {
+ private static Logger logger = LoggerFactory.getLogger(Joiner.class);
+
private KeyValueStore<String, String> store;
private int expected;
+ private TaskName taskName;
@Override
public void init(Config config, TaskContext context) {
this.store = (KeyValueStore<String, String>) context.getStore("joiner-state");
this.expected = config.getInt("num.partitions");
+ this.taskName = context.getTaskName();
}
@Override
@@ -51,22 +60,31 @@ public class Joiner implements StreamTask, InitableTask {
String value = (String) envelope.getMessage();
String[] pieces = value.split("-");
int epoch = Integer.parseInt(pieces[0]);
- int partition = Integer.parseInt(pieces[1]);
+
+ int partition = Integer.parseInt(pieces[1].split(" ")[1]);
Partitions partitions = loadPartitions(epoch, key);
- if(partitions.epoch != epoch) {
+ logger.info("Joiner got epoch = " + epoch + ", partition = " + partition + ", parts = " + partitions);
+ if(partitions.epoch < epoch) {
// we are in a new era
if(partitions.partitions.size() != expected)
throw new IllegalArgumentException("Should have " + expected + " partitions when new epoch starts.");
+ logger.info("Reseting epoch to " + epoch);
this.store.delete(key);
partitions.epoch = epoch;
partitions.partitions.clear();
partitions.partitions.add(partition);
+ } else if(partitions.epoch > epoch){
+ logger.info("Ignoring message for epoch " + epoch);
} else {
partitions.partitions.add(partition);
- if(partitions.partitions.size() == expected)
+ if(partitions.partitions.size() == expected) {
+ logger.info("Completed: " + key + " -> " + Integer.toString(epoch));
collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "completed-keys"), key, Integer.toString(epoch)));
+// logger.info("Completed key " + key + " for epoch " + epoch);
+ }
}
this.store.put(key, partitions.toString());
+ logger.info("Join store in Task " + this.taskName + " " + key + " -> " + partitions.toString());
}
private Partitions loadPartitions(int epoch, String key) {
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java
index b3efac5..7c82e0a 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java
@@ -19,17 +19,6 @@
package org.apache.samza.test.integration.join;
-import java.util.Date;
-import java.util.List;
-import java.util.Properties;
-
-import javax.mail.Message;
-import javax.mail.MessagingException;
-import javax.mail.Session;
-import javax.mail.Transport;
-import javax.mail.internet.InternetAddress;
-import javax.mail.internet.MimeMessage;
-
import org.apache.samza.config.Config;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.task.InitableTask;
@@ -38,29 +27,28 @@ import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.task.WindowableTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class Watcher implements StreamTask, WindowableTask, InitableTask {
+
+ private static Logger logger = LoggerFactory.getLogger(Watcher.class);
private boolean inError = false;
private long lastEpochChange = System.currentTimeMillis();
private long maxTimeBetweenEpochsMs;
private int currentEpoch = 0;
- private String smtpHost;
- private String to;
- private String from;
@Override
public void init(Config config, TaskContext context) {
this.maxTimeBetweenEpochsMs = config.getLong("max.time.between.epochs.ms");
- this.smtpHost = config.get("mail.smtp.host");
- this.to = config.get("mail.to");
- this.from = config.get("mail.from");
}
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
int epoch = Integer.parseInt((String) envelope.getMessage());
if(epoch > currentEpoch) {
+ logger.info("Epoch changed to " + epoch + " from " + currentEpoch);
this.currentEpoch = epoch;
this.lastEpochChange = System.currentTimeMillis();
this.inError = false;
@@ -72,24 +60,8 @@ public class Watcher implements StreamTask, WindowableTask, InitableTask {
boolean isLagging = System.currentTimeMillis() - lastEpochChange > maxTimeBetweenEpochsMs;
if(!inError && isLagging) {
this.inError = true;
- sendEmail(from, to, "Job failed to make progress!", String.format("No epoch change for %d minutes.", this.maxTimeBetweenEpochsMs / (60*1000)));
- }
- }
-
- private void sendEmail(String from, String to, String subject, String body) {
- Properties props = new Properties();
- props.put("mail.smtp.host", smtpHost);
- Session session = Session.getInstance(props, null);
- try {
- MimeMessage msg = new MimeMessage(session);
- msg.setFrom(new InternetAddress(from));
- msg.addRecipients(Message.RecipientType.TO, InternetAddress.parse(to));
- msg.setSubject(subject);
- msg.setSentDate(new Date());
- msg.setText(body);
- Transport.send(msg);
- } catch (MessagingException e) {
- throw new RuntimeException(e);
+ logger.info("Error state detected, alerting...");
+ logger.error("Job failed to make progress!" + String.format("No epoch change for %d minutes.", this.maxTimeBetweenEpochsMs / (60*1000)));
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/python/samza_failure_testing.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/samza_failure_testing.py b/samza-test/src/main/python/samza_failure_testing.py
new file mode 100755
index 0000000..198db26
--- /dev/null
+++ b/samza-test/src/main/python/samza_failure_testing.py
@@ -0,0 +1,166 @@
+#################################################################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#################################################################################################################################
+
+import sys
+import paramiko
+import os.path
+import random
+import requests
+import logging
+from time import sleep
+from optparse import OptionParser
+
+#################################################################################################################################
+# A "chaos monkey"-like script that periodically kills parts of Samza, including YARN (RM, NM), Kafka, and Samza (AM, container)
+# This script depends on paramiko, an ssh library
+#################################################################################################################################
+
+# Create an ssh connection to the given host
+def connect(host):
+ client = paramiko.SSHClient()
+ client.load_system_host_keys()
+ client.set_missing_host_key_policy(paramiko.WarningPolicy())
+ client.connect(host)
+ return client
+
+# Run the given command line using the ssh connection (throw an error if anything on stderr)
+def execute(conn, cmd):
+ logging.info("Executing command {0}".format(cmd))
+ stdin, stdout, stderr = conn.exec_command(cmd)
+ output = stdout.read()
+ err = stderr.read()
+ if output:
+ logging.info(output)
+ return output
+ if err:
+ logging.error(err)
+ raise Exception("Error executing command: %s" %err)
+
+# Unfortunately pkill on mac seems to have a length limit which prevents it working with out epic java command line arguments
+def pkill(pid):
+ return "kill {0}".format(pid)
+
+def get_pid(pattern):
+ return "ps aux | grep " + pattern + " | grep -v 'grep' | awk -F' *' '{print $2}'"
+
+# Kill the kafka broker
+def kill_kafka(options):
+ connection = connect(options.kafka_host)
+ logging.info("Killing Kafka Broker ...")
+ kafka_pid = execute(connection, get_pid("kafka.Kafka"))
+ if kafka_pid is not None:
+ execute(connection, pkill(kafka_pid))
+ sleep(20)
+ logging.info("Restarting Kafka Broker...")
+ execute(connection, "nohup " + options.kafka_dir + "/bin/kafka-server-start.sh " + options.kafka_dir + "/config/server.properties > " + options.kafka_dir + "/logs/kafka.log 2>&1 &")
+ connection.close()
+ else:
+ logging.info("Could not determine Kafka broker process. Not doing anything")
+
+
+def query_yarn(host, port, query):
+ return requests.get("http://{0}:{1}/{2}".format(host, port, query))
+
+def get_app_ids_running(host, port):
+ logging.info("Querying RM for RUNNING application information")
+ list_apps_command = "ws/v1/cluster/apps?status=RUNNING"
+ response = query_yarn(host, port, list_apps_command).json()
+ if len(response) == 0 or not response['apps']:
+ raise Exception("Got an empty apps response back. Can't run kill script without Samza jobs running.")
+ apps = reduce(list.__add__, map(lambda x: list(x), response['apps'].values()))
+ appInfo = []
+ for app in apps:
+ appInfo.append((app['id'], app['name']))
+ return appInfo
+
+# Kill the samza container instances
+def kill_containers(hosts, app_id):
+ for host in hosts:
+ connection = connect(host)
+ pid = execute(connection, get_pid("samza-container-0 | grep {0}".format(app_id)))
+ if pid:
+ logging.info("Killing samza container on {0} with pid {1}".format(host, pid))
+ execute(connection, pkill(pid))
+ connection.close()
+ if pid is None:
+ logging.info("Couldn't find any container on the list of hosts. Nothing to kill :(")
+
+def require_arg(options, name):
+ if not hasattr(options, name) or getattr(options, name) is None:
+ print >> sys.stderr, "Missing required property:", name
+ sys.exit(1)
+
+# Command line options
+parser = OptionParser()
+parser.add_option("--node-list", dest="filename", help="A list of nodes in the YARN cluster", metavar="nodes.txt")
+parser.add_option("--kill-time", dest="kill_time", help="The time in seconds to sleep between", metavar="s", default=90)
+parser.add_option("--kafka-dir", dest="kafka_dir", help="The directory in which to find kafka", metavar="dir")
+parser.add_option("--kafka-host", dest="kafka_host", help="Host on which Kafka is installed", metavar="localhost")
+parser.add_option("--yarn-dir", dest="yarn_dir", help="The directory in which to find yarn", metavar="dir")
+parser.add_option("--kill-kafka", action="store_true", dest="kill_kafka", default=False, help="Should we kill Kafka?")
+parser.add_option("--kill-container", action="store_true", dest="kill_container", default=False, help="Should we kill Application Container?")
+parser.add_option("--yarn-host", dest="yarn_host", help="Host that will respond to Yarn REST queries ", metavar="localhost")
+
+(options, args) = parser.parse_args()
+
+kill_script_log_path = '/tmp/samza-kill-log.log'
+logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S', filename=kill_script_log_path, level=logging.INFO)
+console = logging.StreamHandler()
+formatter = logging.Formatter(fmt='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S')
+console.setFormatter(formatter)
+logging.getLogger('').addHandler(console)
+
+components = []
+app_info = {}
+if options.kill_container:
+ require_arg(options, 'filename')
+ require_arg(options, 'yarn_host')
+ require_arg(options, 'yarn_dir')
+ components.append("container")
+ hosts = [line.strip() for line in open(options.filename).readlines()]
+ if len(hosts) < 1:
+ print >> sys.stderr, "No hosts in host file."
+ sys.exit(1)
+ app_info = get_app_ids_running(options.yarn_host, 8088)
+
+if options.kill_kafka:
+ require_arg(options, 'kafka_host')
+ require_arg(options, 'kafka_dir')
+ components.append("kafka")
+
+if len(components) == 0:
+ parser.print_help()
+else:
+ while True:
+ kill_time = int(options.kill_time)
+ component_id = 0
+ if len(components) > 1:
+ component_id = random.randint(0, len(components) - 1)
+ kill_component = components[component_id]
+ if kill_component == "kafka":
+ logging.info("Choosing Kafka broker on {0}".format(options.kafka_host))
+ kill_kafka(options)
+ logging.info("Sleeping for {0}".format(kill_time * 2))
+ sleep(kill_time * 2)
+ elif kill_component == "container":
+ app_id_to_kill = random.randint(0, len(app_info) - 1)
+ logging.info("Choosing a Samza Container for {0}".format(app_info[app_id_to_kill]))
+ kill_containers(hosts, app_info[app_id_to_kill][0])
+ logging.info("Sleeping for {0}".format(kill_time))
+ sleep(kill_time)
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/resources/hello-stateful-world.samza
----------------------------------------------------------------------
diff --git a/samza-test/src/main/resources/hello-stateful-world.samza b/samza-test/src/main/resources/hello-stateful-world.samza
new file mode 100644
index 0000000..745f881
--- /dev/null
+++ b/samza-test/src/main/resources/hello-stateful-world.samza
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=samza.job.local.ThreadJobFactory
+job.name=hello-stateful-world
+
+# Task
+task.class=samza.test.integration.SimpleStatefulTask
+task.inputs=kafka.input
+
+# Stores
+stores.mystore.factory=samza.storage.kv.KeyValueStorageEngineFactory
+stores.mystore.key.serde=string
+stores.mystore.msg.serde=string
+stores.mystore.changelog=kafka.mystore
http://git-wip-us.apache.org/repos/asf/samza/blob/87c4ab65/samza-test/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/samza-test/src/main/resources/log4j.xml b/samza-test/src/main/resources/log4j.xml
index 2e54cc2..f93e4dd 100644
--- a/samza-test/src/main/resources/log4j.xml
+++ b/samza-test/src/main/resources/log4j.xml
@@ -12,6 +12,14 @@
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="RollingAppender" class="org.apache.log4j.DailyRollingFileAppender">
+ <param name="File" value="${samza.log.dir}/${samza.container.name}.log" />
+ <param name="DatePattern" value="'.'yyyy-MM-dd" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" />
+ </layout>
+ </appender>
+
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out" />
<layout class="org.apache.log4j.PatternLayout">
@@ -25,7 +33,8 @@
<root>
<priority value="info" />
- <appender-ref ref="console" />
+ <appender-ref ref="console" />
+ <appender-ref ref="RollingAppender" />
</root>
</log4j:configuration>
[2/2] samza git commit: SAMZA-577;
SamzaContainerExceptionHandler should print exception to stderr
Posted by cr...@apache.org.
SAMZA-577; SamzaContainerExceptionHandler should print exception to stderr
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/04f7ff7b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/04f7ff7b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/04f7ff7b
Branch: refs/heads/master
Commit: 04f7ff7b4cee76a11991ac69b3eb60c9d4ede31f
Parents: 87c4ab6
Author: Chris Riccomini <cr...@apache.org>
Authored: Thu Feb 26 14:47:28 2015 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Thu Feb 26 14:47:28 2015 -0800
----------------------------------------------------------------------
.../org/apache/samza/container/SamzaContainerExceptionHandler.scala | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/04f7ff7b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala
index bbb094c..da4c098 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala
@@ -29,6 +29,7 @@ import org.apache.samza.util.Logging
class SamzaContainerExceptionHandler(exit: () => Unit) extends UncaughtExceptionHandler with Logging {
def uncaughtException(t: Thread, e: Throwable) {
error("Uncaught exception in thread (name=%s). Exiting process now.".format(t.getName), e)
+ e.printStackTrace(System.err);
exit()
}
}
\ No newline at end of file