You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/02/24 18:47:50 UTC
[1/7] samza git commit: SAMZA-547; add java serializable serde
Repository: samza
Updated Branches:
refs/heads/samza-sql 78d2fedb6 -> c8da3e3b5
SAMZA-547; add java serializable serde
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5e32a1bb
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5e32a1bb
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5e32a1bb
Branch: refs/heads/samza-sql
Commit: 5e32a1bb018cb0382ab4911c31554f8889de984f
Parents: 6743df3
Author: Ruslan Khafizov <ru...@gmail.com>
Authored: Thu Feb 12 14:43:31 2015 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Thu Feb 12 14:43:31 2015 -0800
----------------------------------------------------------------------
.../samza/serializers/SerializableSerde.scala | 67 ++++++++++++++++++++
.../serializers/TestSerializableSerde.scala | 45 +++++++++++++
2 files changed, 112 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/5e32a1bb/samza-core/src/main/scala/org/apache/samza/serializers/SerializableSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/SerializableSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/SerializableSerde.scala
new file mode 100644
index 0000000..c43f863
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/SerializableSerde.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.io.ByteArrayInputStream
+import java.io.ByteArrayOutputStream
+import java.io.ObjectInputStream
+import java.io.ObjectOutputStream
+
+import org.apache.samza.config.Config
+
+/**
+ * A serializer for Serializable
+ */
+class SerializableSerdeFactory[T <: java.io.Serializable] extends SerdeFactory[T] {
+ def getSerde(name: String, config: Config): Serde[T] =
+ new SerializableSerde[T]
+}
+
+class SerializableSerde[T <: java.io.Serializable] extends Serde[T] {
+ def toBytes(obj: T): Array[Byte] = if (obj != null) {
+ val bos = new ByteArrayOutputStream
+ val oos = new ObjectOutputStream(bos)
+
+ try {
+ oos.writeObject(obj)
+ }
+ finally {
+ oos.close()
+ }
+
+ bos.toByteArray
+ } else {
+ null
+ }
+
+ def fromBytes(bytes: Array[Byte]): T = if (bytes != null) {
+ val bis = new ByteArrayInputStream(bytes)
+ val ois = new ObjectInputStream(bis)
+
+ try {
+ ois.readObject.asInstanceOf[T]
+ }
+ finally{
+ ois.close()
+ }
+ } else {
+ null.asInstanceOf[T]
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/5e32a1bb/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala
new file mode 100644
index 0000000..8e899d0
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.junit.Assert._
+import org.junit.Test
+
+class TestSerializableSerde {
+ @Test
+ def testSerializableSerde {
+ val serde = new SerializableSerde[String]
+ assertNull(serde.toBytes(null))
+ assertNull(serde.fromBytes(null))
+
+ val obj = "String is serializable"
+
+ // Serialized string is prefix + string itself
+ val prefix = Array(0xAC, 0xED, 0x00, 0x05, 0x74, 0x00, 0x16).map(_.toByte)
+ val expected = (prefix ++ obj.getBytes("UTF-8"))
+
+ val bytes = serde.toBytes(obj)
+
+ assertArrayEquals(expected, bytes)
+
+ val objRoundTrip:String = serde.fromBytes(bytes)
+ assertEquals(obj, objRoundTrip)
+ }
+}
[3/7] samza git commit: SAMZA-548;
add performance test for container with kafka consumer and producer
Posted by cr...@apache.org.
SAMZA-548; add performance test for container with kafka consumer and producer
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/41c74b96
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/41c74b96
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/41c74b96
Branch: refs/heads/samza-sql
Commit: 41c74b96876473acb6403e544bec7a00a04d2fa3
Parents: c7ac263
Author: Chris Riccomini <cr...@apache.org>
Authored: Fri Feb 13 17:10:03 2015 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Fri Feb 13 17:10:03 2015 -0800
----------------------------------------------------------------------
samza-shell/src/main/bash/stat-yarn-job.sh | 21 +++++
.../src/main/config/negate-number.properties | 18 +----
.../kafka-read-write-performance.properties | 35 ++++++++
.../test/integration/NegateNumberTask.java | 44 +++++++++-
.../src/main/python/configs/downloads.json | 2 +-
samza-test/src/main/python/configs/kafka.json | 22 ++---
.../python/configs/smoke-tests/smoke-tests.json | 6 --
samza-test/src/main/python/configs/tests.json | 5 ++
samza-test/src/main/python/deployment.py | 21 ++---
.../src/main/python/samza_job_yarn_deployer.py | 47 ++++++++++-
samza-test/src/main/python/tests.py | 3 +-
.../src/main/python/tests/performance_tests.py | 80 +++++++++++++++++++
samza-test/src/main/python/tests/smoke_tests.py | 83 +++++++------------
samza-test/src/main/python/tests/util.py | 84 ++++++++++++++++++++
14 files changed, 359 insertions(+), 112 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-shell/src/main/bash/stat-yarn-job.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/stat-yarn-job.sh b/samza-shell/src/main/bash/stat-yarn-job.sh
new file mode 100755
index 0000000..e5f6847
--- /dev/null
+++ b/samza-shell/src/main/bash/stat-yarn-job.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh org.apache.hadoop.yarn.client.cli.ApplicationCLI application -status "$@"
http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/config/negate-number.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/negate-number.properties b/samza-test/src/main/config/negate-number.properties
index 379fa61..b9f898c 100644
--- a/samza-test/src/main/config/negate-number.properties
+++ b/samza-test/src/main/config/negate-number.properties
@@ -21,18 +21,12 @@ job.name=samza-negate-number
# YARN
yarn.container.count=1
-yarn.container.memory.mb=1024
# Task
task.class=org.apache.samza.test.integration.NegateNumberTask
task.inputs=kafka.samza-test-topic
-task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
-task.checkpoint.replication.factor=1
-task.checkpoint.system=kafka
-task.lifecycle.listener.generator.class=com.linkedin.samza.task.GeneratorLifecycleListenerFactory
-task.lifecycle.listener.generator.fabric=CORP-EAT1
-task.opts=-Xmx6g
-task.command.class=org.apache.samza.job.ShellCommandBuilder
+task.max.messages=50
+task.outputs=kafka.samza-test-topic-output
# Serializers
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
@@ -43,12 +37,4 @@ systems.kafka.samza.msg.serde=string
systems.kafka.samza.key.serde=string
systems.kafka.samza.offset.default=oldest
systems.kafka.consumer.zookeeper.connect=localhost:2181/
-systems.kafka.producer.compression.type=gzip
systems.kafka.producer.bootstrap.servers=localhost:9092
-systems.kafka.producer.acks=1
-systems.kafka.producer.metadata.max.age.ms=86400000
-# Normally, we'd set this much higher, but we want things to look snappy in the demo.
-systems.kafka.producer.buffer.memory=1000000
-
-# negate-number
-streams.samza-test-topic.consumer.reset.offset=true
http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/config/perf/kafka-read-write-performance.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/perf/kafka-read-write-performance.properties b/samza-test/src/main/config/perf/kafka-read-write-performance.properties
new file mode 100644
index 0000000..122b14a
--- /dev/null
+++ b/samza-test/src/main/config/perf/kafka-read-write-performance.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=kafka-read-write-performance
+
+# YARN
+yarn.container.count=1
+
+# Task
+task.class=org.apache.samza.test.performance.TestPerformanceTask
+task.inputs=kafka.kafka-read-write-performance-input
+task.outputs=kafka.kafka-read-write-performance-output
+task.max.messages=1000000
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.offset.default=oldest
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
index 782e9f4..617cea6 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
@@ -19,22 +19,58 @@
package org.apache.samza.test.integration;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.InitableTask;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.TaskCoordinator.RequestScope;
+import org.apache.samza.util.Util;
-/*
- * A simple test job that reads strings, converts them to integers, multiplies
+/**
+ * A simple test job that reads strings, converts them to integers, multiplies
* by -1, and outputs to "samza-test-topic-output" stream.
*/
-public class NegateNumberTask implements StreamTask {
+public class NegateNumberTask implements StreamTask, InitableTask {
+ /**
+ * How many messages the all tasks in a single container have processed.
+ */
+ private static int messagesProcessed = 0;
+
+ /**
+ * How many messages to process before shutting down.
+ */
+ private int maxMessages;
+
+ /**
+ * The SystemStream to send negated numbers to.
+ */
+ private SystemStream outputSystemStream;
+
+ @Override
+ public void init(Config config, TaskContext context) throws Exception {
+ maxMessages = config.getInt("task.max.messages", 50);
+ String outputSystemStreamString = config.get("task.outputs", null);
+ if (outputSystemStreamString == null) {
+ throw new ConfigException("Missing required configuration: task.outputs");
+ }
+ outputSystemStream = Util.getSystemStreamFromNames(outputSystemStreamString);
+ }
+
+ @Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+ messagesProcessed += 1;
String input = (String) envelope.getMessage();
Integer number = Integer.valueOf(input);
Integer output = number.intValue() * -1;
- collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "samza-test-topic-output"), output.toString()));
+ collector.send(new OutgoingMessageEnvelope(outputSystemStream, output.toString()));
+ if (messagesProcessed >= maxMessages) {
+ coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/configs/downloads.json
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/configs/downloads.json b/samza-test/src/main/python/configs/downloads.json
index 8ded306..a75756f 100644
--- a/samza-test/src/main/python/configs/downloads.json
+++ b/samza-test/src/main/python/configs/downloads.json
@@ -1,5 +1,5 @@
{
- "url_kafka": "http://www.us.apache.org/dist/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz",
+ "url_kafka": "http://www.us.apache.org/dist/kafka/0.8.2.0/kafka_2.9.2-0.8.2.0.tgz",
"url_zookeeper": "http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz",
"url_hadoop": "https://archive.apache.org/dist/hadoop/common/hadoop-2.4.0/hadoop-2.4.0.tar.gz"
}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/configs/kafka.json
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/configs/kafka.json b/samza-test/src/main/python/configs/kafka.json
index 9a7af19..ab2f346 100644
--- a/samza-test/src/main/python/configs/kafka.json
+++ b/samza-test/src/main/python/configs/kafka.json
@@ -3,21 +3,21 @@
"kafka_instance_0": "localhost"
},
"kafka_port": 9092,
- "kafka_start_cmd": "kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh -daemon kafka_2.9.2-0.8.1.1/config/server.properties",
- "kafka_stop_cmd": "kafka_2.9.2-0.8.1.1/bin/kafka-server-stop.sh",
+ "kafka_start_cmd": "kafka_2.9.2-0.8.2.0/bin/kafka-server-start.sh -daemon kafka_2.9.2-0.8.2.0/config/server.properties",
+ "kafka_stop_cmd": "kafka_2.9.2-0.8.2.0/bin/kafka-server-stop.sh",
"kafka_install_path": "deploy/kafka",
- "kafka_executable": "kafka_2.9.2-0.8.1.1.tgz",
+ "kafka_executable": "kafka_2.9.2-0.8.2.0.tgz",
"kafka_post_install_cmds": [
- "sed -i.bak 's/SIGINT/SIGTERM/g' kafka_2.9.2-0.8.1.1/bin/kafka-server-stop.sh",
- "sed -i.bak 's/^num\\.partitions *=.*/num.partitions=1/' kafka_2.9.2-0.8.1.1/config/server.properties",
- "sed -i.bak 's/.*log.dirs.*/log.dirs=data/g' kafka_2.9.2-0.8.1.1/config/server.properties"
+ "sed -i.bak 's/SIGINT/SIGTERM/g' kafka_2.9.2-0.8.2.0/bin/kafka-server-stop.sh",
+ "sed -i.bak 's/^num\\.partitions *=.*/num.partitions=1/' kafka_2.9.2-0.8.2.0/config/server.properties",
+ "sed -i.bak 's/.*log.dirs.*/log.dirs=data/g' kafka_2.9.2-0.8.2.0/config/server.properties"
],
"kafka_logs": [
"log-cleaner.log",
- "kafka_2.9.2-0.8.1.1/logs/controller.log",
- "kafka_2.9.2-0.8.1.1/logs/kafka-request.log",
- "kafka_2.9.2-0.8.1.1/logs/kafkaServer-gc.log",
- "kafka_2.9.2-0.8.1.1/logs/server.log",
- "kafka_2.9.2-0.8.1.1/logs/state-change.log"
+ "kafka_2.9.2-0.8.2.0/logs/controller.log",
+ "kafka_2.9.2-0.8.2.0/logs/kafka-request.log",
+ "kafka_2.9.2-0.8.2.0/logs/kafkaServer-gc.log",
+ "kafka_2.9.2-0.8.2.0/logs/server.log",
+ "kafka_2.9.2-0.8.2.0/logs/state-change.log"
]
}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/configs/smoke-tests/smoke-tests.json
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/configs/smoke-tests/smoke-tests.json b/samza-test/src/main/python/configs/smoke-tests/smoke-tests.json
deleted file mode 100644
index 65f8568..0000000
--- a/samza-test/src/main/python/configs/smoke-tests/smoke-tests.json
+++ /dev/null
@@ -1,6 +0,0 @@
-{
- "samza_executable": "samza-test_2.10-0.9.0-SNAPSHOT.tgz",
- "samza_install_path": "deploy/smoke_tests",
- "samza_config_factory": "org.apache.samza.config.factories.PropertiesConfigFactory",
- "samza_config_file": "config/negate-number.properties"
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/configs/tests.json
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/configs/tests.json b/samza-test/src/main/python/configs/tests.json
new file mode 100644
index 0000000..5251af9
--- /dev/null
+++ b/samza-test/src/main/python/configs/tests.json
@@ -0,0 +1,5 @@
+{
+ "samza_executable": "samza-test_2.10-0.9.0-SNAPSHOT.tgz",
+ "samza_install_path": "deploy/smoke_tests",
+ "samza_config_factory": "org.apache.samza.config.factories.PropertiesConfigFactory"
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/deployment.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/deployment.py b/samza-test/src/main/python/deployment.py
index a0e1481..89ba728 100644
--- a/samza-test/src/main/python/deployment.py
+++ b/samza-test/src/main/python/deployment.py
@@ -76,36 +76,25 @@ def setup_suite():
'hostname': host
})
- # Start the Samza jobs.
+ # Setup Samza job deployer.
samza_job_deployer = SamzaJobYarnDeployer({
+ 'config_factory': c('samza_config_factory'),
'yarn_site_template': c('yarn_site_template'),
'yarn_driver_configs': c('yarn_driver_configs'),
'yarn_nm_hosts': c('yarn_nm_hosts').values(),
'install_path': samza_install_path,
})
- samza_job_deployer.install('smoke_tests', {
+ samza_job_deployer.install('tests', {
'executable': c('samza_executable'),
})
- samza_job_deployer.start('negate_number', {
- 'package_id': 'smoke_tests',
- 'config_factory': c('samza_config_factory'),
- 'config_file': c('samza_config_file'),
- 'install_path': samza_install_path,
- })
+ runtime.set_deployer('samza_job_deployer', samza_job_deployer)
def teardown_suite():
- # Stop the samza jobs.
- samza_job_deployer.stop('negate_number', {
- 'package_id': 'smoke_tests',
- 'install_path': samza_install_path,
- })
-
- samza_job_deployer.uninstall('smoke_tests')
+ samza_job_deployer.uninstall('tests')
# Undeploy everything.
for name, deployer in deployers.iteritems():
for instance, host in c(name + '_hosts').iteritems():
deployer.undeploy(instance)
-
http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/samza_job_yarn_deployer.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/samza_job_yarn_deployer.py b/samza-test/src/main/python/samza_job_yarn_deployer.py
index e18bc58..38635ca 100644
--- a/samza-test/src/main/python/samza_job_yarn_deployer.py
+++ b/samza-test/src/main/python/samza_job_yarn_deployer.py
@@ -39,7 +39,7 @@ class SamzaJobYarnDeployer(Deployer):
to start and stop Samza jobs in a YARN grid.
param: configs -- Map of config key/values pairs. These configs will be used
- as a default whenever overrides are not provided in the methods (intall,
+ as a default whenever overrides are not provided in the methods (install,
start, stop, etc) below.
"""
logging.getLogger("paramiko").setLevel(logging.ERROR)
@@ -173,6 +173,47 @@ class SamzaJobYarnDeployer(Deployer):
p.wait()
assert p.returncode == 0, "Command returned non-zero exit code ({0}): {1}".format(p.returncode, command)
+ def await(self, job_id, configs={}):
+ """
+ Waits for a Samza job to finish using bin/stat-yarn-job.sh. A job is
+ finished when its "Final State" is not "UNDEFINED".
+
+ param: job_id -- A unique ID used to idenitfy a Samza job.
+ param: configs -- Map of config key/values pairs. Valid keys include:
+
+ package_id: The package_id for the package that contains the code for job_id.
+ Usually, the package_id refers to the .tgz job tarball that contains the
+ code necessary to run job_id.
+ """
+ configs = self._get_merged_configs(configs)
+ self._validate_configs(configs, ['package_id'])
+
+ # Get configs.
+ package_id = configs.get('package_id')
+
+ # Get the application_id for the job.
+ application_id = self.app_ids.get(job_id)
+
+ # Stat the job, if it's been started, or WARN and return if it's hasn't.
+ final_state = 'UNDEFINED'
+ if not application_id:
+ logger.warn("Can't stat a job that was never started: {0}".format(job_id))
+ else:
+ command = "{0} {1}".format(os.path.join(package_id, "bin/stat-yarn-job.sh"), application_id)
+ env = self._get_env_vars(package_id)
+ while final_state == 'UNDEFINED':
+ p = Popen(command.split(' '), stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env)
+ output, err = p.communicate()
+ logger.debug("Output from run-job.sh:\nstdout: {0}\nstderr: {1}".format(output, err))
+ assert p.returncode == 0, "Command ({0}) returned non-zero exit code ({1}).\nstdout: {2}\nstderr: {3}".format(command, p.returncode, output, err)
+
+ # Check the final state for the job.
+ regex = r'.*Final.State . (\w*)'
+ match = re.match(regex, output.replace("\n", ' '))
+ final_state = match.group(1)
+ logger.debug("Got final state {0} for job_id {1}.".format(final_state, job_id))
+ return final_state
+
def uninstall(self, package_id, configs={}):
"""
Removes the install path for package_id from all remote hosts that it's been
@@ -201,6 +242,10 @@ class SamzaJobYarnDeployer(Deployer):
# TODO we should implement the below helper methods over time, as we need them.
+ def get_processes(self):
+ # TODO raise NotImplementedError
+ return []
+
def get_pid(self, container_id, configs={}):
raise NotImplementedError
http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/tests.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/tests.py b/samza-test/src/main/python/tests.py
index dae414e..df64e23 100644
--- a/samza-test/src/main/python/tests.py
+++ b/samza-test/src/main/python/tests.py
@@ -24,6 +24,7 @@ test = {
'perf_code': os.path.join(dir, 'perf.py'),
'configs_directory': os.path.join(dir, 'configs'),
'test_code': [
- os.path.join(dir, 'tests', 'smoke_tests.py')
+ os.path.join(dir, 'tests', 'smoke_tests.py'),
+ os.path.join(dir, 'tests', 'performance_tests.py'),
],
}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/tests/performance_tests.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/tests/performance_tests.py b/samza-test/src/main/python/tests/performance_tests.py
new file mode 100644
index 0000000..a97717f
--- /dev/null
+++ b/samza-test/src/main/python/tests/performance_tests.py
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import util
+import logging
+import zopkio.runtime as runtime
+from kafka import SimpleProducer, SimpleConsumer
+
+logger = logging.getLogger(__name__)
+
+JOB_ID = 'kafka-read-write-performance'
+PACKAGE_ID = 'tests'
+CONFIG_FILE = 'config/perf/kafka-read-write-performance.properties'
+TEST_INPUT_TOPIC = 'kafka-read-write-performance-input'
+TEST_OUTPUT_TOPIC = 'kafka-read-write-performance-output'
+NUM_MESSAGES = 1000000
+MESSAGE = 'a' * 200
+
+def test_kafka_read_write_performance():
+ """
+ Runs a Samza job that reads from Kafka, and writes back out to it. The
+ writes/sec for the job is logged to the job's container.
+ """
+ _load_data()
+ util.start_job(PACKAGE_ID, JOB_ID, CONFIG_FILE)
+ util.await_job(PACKAGE_ID, JOB_ID)
+
+def validate_kafka_read_write_performance():
+ """
+ Validates that all messages were sent to the output topic.
+ """
+ logger.info('Running validate_kafka_read_write_performance')
+ kafka = util.get_kafka_client()
+ kafka.ensure_topic_exists(TEST_OUTPUT_TOPIC)
+ consumer = SimpleConsumer(
+ kafka,
+ 'samza-test-group',
+ TEST_OUTPUT_TOPIC,
+ fetch_size_bytes=1000000,
+ buffer_size=32768,
+ max_buffer_size=None)
+ # wait 5 minutes to get all million messages
+ messages = consumer.get_messages(count=NUM_MESSAGES, block=True, timeout=300)
+ message_count = len(messages)
+ assert NUM_MESSAGES == message_count, 'Expected {0} lines, but found {1}'.format(NUM_MESSAGES, message_count)
+ kafka.close()
+
+def _load_data():
+ """
+ Sends 10 million messages to kafka-read-write-performance-input.
+ """
+ logger.info('Running test_kafka_read_write_performance')
+ kafka = util.get_kafka_client()
+ kafka.ensure_topic_exists(TEST_INPUT_TOPIC)
+ producer = SimpleProducer(
+ kafka,
+ req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT,
+ ack_timeout=30000,
+ batch_send=True,
+ batch_send_every_n=200)
+ logger.info('Loading {0} test messages.'.format(NUM_MESSAGES))
+ for i in range(0, NUM_MESSAGES):
+ if i % 100000 == 0:
+ logger.info('Loaded {0} messages.'.format(i))
+ producer.send_messages(TEST_INPUT_TOPIC, MESSAGE)
+ kafka.close()
http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/tests/smoke_tests.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/tests/smoke_tests.py b/samza-test/src/main/python/tests/smoke_tests.py
index 7aec4e0..53d5fa9 100644
--- a/samza-test/src/main/python/tests/smoke_tests.py
+++ b/samza-test/src/main/python/tests/smoke_tests.py
@@ -15,37 +15,29 @@
# specific language governing permissions and limitations
# under the License.
-import os
-import time
+import util
import logging
-import socket
-import errno
-from kafka import KafkaClient, SimpleProducer, SimpleConsumer
import zopkio.runtime as runtime
+from kafka import SimpleProducer, SimpleConsumer
logger = logging.getLogger(__name__)
-CWD = os.path.dirname(os.path.abspath(__file__))
-HOME_DIR = os.path.join(CWD, os.pardir)
-DATA_DIR = os.path.join(HOME_DIR, 'data')
-TEST_TOPIC = 'samza-test-topic'
+DEPLOYER = 'samza_job_deployer'
+JOB_ID = 'negate_number'
+PACKAGE_ID = 'tests'
+CONFIG_FILE = 'config/negate-number.properties'
+TEST_INPUT_TOPIC = 'samza-test-topic'
TEST_OUTPUT_TOPIC = 'samza-test-topic-output'
NUM_MESSAGES = 50
def test_samza_job():
"""
- Sends 50 messages (1 .. 50) to samza-test-topic.
+ Runs a job that reads converts input strings to integers, negates the
+ integer, and outputs to a Kafka topic.
"""
- logger.info('Running test_samza_job')
- kafka = _get_kafka_client()
- kafka.ensure_topic_exists(TEST_TOPIC)
- producer = SimpleProducer(kafka,
- async=False,
- req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT,
- ack_timeout=30000)
- for i in range(1, NUM_MESSAGES + 1):
- producer.send_messages(TEST_TOPIC, str(i))
- kafka.close()
+ _load_data()
+ util.start_job(PACKAGE_ID, JOB_ID, CONFIG_FILE)
+ util.await_job(PACKAGE_ID, JOB_ID)
def validate_samza_job():
"""
@@ -53,49 +45,28 @@ def validate_samza_job():
samza-test-topic-output.
"""
logger.info('Running validate_samza_job')
- kafka = _get_kafka_client()
+ kafka = util.get_kafka_client()
kafka.ensure_topic_exists(TEST_OUTPUT_TOPIC)
consumer = SimpleConsumer(kafka, 'samza-test-group', TEST_OUTPUT_TOPIC)
- messages = consumer.get_messages(count=NUM_MESSAGES, block=True, timeout=60)
+ messages = consumer.get_messages(count=NUM_MESSAGES, block=True, timeout=300)
message_count = len(messages)
assert NUM_MESSAGES == message_count, 'Expected {0} lines, but found {1}'.format(NUM_MESSAGES, message_count)
for message in map(lambda m: m.message.value, messages):
assert int(message) < 0 , 'Expected negative integer but received {0}'.format(message)
kafka.close()
-def _get_kafka_client(num_retries=20, retry_sleep=1):
- """
- Returns a KafkaClient based off of the kafka_hosts and kafka_port configs set
- in the active runtime.
- """
- kafka_hosts = runtime.get_active_config('kafka_hosts').values()
- kafka_port = runtime.get_active_config('kafka_port')
- assert len(kafka_hosts) > 0, 'Missing required configuration: kafka_hosts'
- connect_string = ','.join(map(lambda h: h + ':{0},'.format(kafka_port), kafka_hosts)).rstrip(',')
- # wait for at least one broker to come up
- if not _wait_for_server(kafka_hosts[0], kafka_port, 30):
- raise Exception('Unable to connect to Kafka broker: {0}:{1}'.format(kafka_hosts[0], kafka_port))
- return KafkaClient(connect_string)
-
-def _wait_for_server(host, port, timeout=5, retries=12):
+def _load_data():
"""
- Keep trying to connect to a host port until the retry count has been reached.
+ Sends 50 messages (1 .. 50) to samza-test-topic.
"""
- s = socket.socket()
-
- for i in range(retries):
- try:
- s.settimeout(timeout)
- s.connect((host, port))
- except socket.timeout, err:
- # Exception occurs if timeout is set. Wait and retry.
- pass
- except socket.error, err:
- # Exception occurs if timeout > underlying network timeout. Wait and retry.
- if type(err.args) != tuple or err[0] != errno.ETIMEDOUT:
- raise
- else:
- s.close()
- return True
- return False
-
+ logger.info('Running test_samza_job')
+ kafka = util.get_kafka_client()
+ kafka.ensure_topic_exists(TEST_INPUT_TOPIC)
+ producer = SimpleProducer(
+ kafka,
+ async=False,
+ req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT,
+ ack_timeout=30000)
+ for i in range(1, NUM_MESSAGES + 1):
+ producer.send_messages(TEST_INPUT_TOPIC, str(i))
+ kafka.close()
http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/tests/util.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/tests/util.py b/samza-test/src/main/python/tests/util.py
new file mode 100644
index 0000000..a0ed671
--- /dev/null
+++ b/samza-test/src/main/python/tests/util.py
@@ -0,0 +1,84 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import socket
+import errno
+import zopkio.runtime as runtime
+from kafka import KafkaClient, SimpleProducer, SimpleConsumer
+from zopkio.runtime import get_active_config as c
+
+logger = logging.getLogger(__name__)
+
+DEPLOYER = 'samza_job_deployer'
+
+def start_job(package_id, job_id, config_file):
+ """
+ Start a Samza job.
+ """
+ logger.info('Starting {0}.{1}'.format(package_id, job_id))
+ samza_job_deployer = runtime.get_deployer(DEPLOYER)
+ samza_job_deployer.start(job_id, {
+ 'package_id': package_id,
+ 'config_file': config_file,
+ })
+
+def await_job(package_id, job_id):
+ """
+ Wait for a Samza job to finish.
+ """
+ logger.info('Awaiting {0}.{1}'.format(package_id, job_id))
+ samza_job_deployer = runtime.get_deployer(DEPLOYER)
+ samza_job_deployer.await(job_id, {
+ 'package_id': package_id,
+ })
+
+def get_kafka_client(num_retries=20, retry_sleep=1):
+ """
+ Returns a KafkaClient based off of the kafka_hosts and kafka_port configs set
+ in the active runtime.
+ """
+ kafka_hosts = runtime.get_active_config('kafka_hosts').values()
+ kafka_port = runtime.get_active_config('kafka_port')
+ assert len(kafka_hosts) > 0, 'Missing required configuration: kafka_hosts'
+ connect_string = ','.join(map(lambda h: h + ':{0},'.format(kafka_port), kafka_hosts)).rstrip(',')
+ # wait for at least one broker to come up
+ if not wait_for_server(kafka_hosts[0], kafka_port, 30):
+ raise Exception('Unable to connect to Kafka broker: {0}:{1}'.format(kafka_hosts[0], kafka_port))
+ return KafkaClient(connect_string)
+
+def wait_for_server(host, port, timeout=5, retries=12):
+ """
+ Keep trying to connect to a host port until the retry count has been reached.
+ """
+ s = socket.socket()
+
+ for i in range(retries):
+ try:
+ s.settimeout(timeout)
+ s.connect((host, port))
+ except socket.timeout, err:
+ # Exception occurs if timeout is set. Wait and retry.
+ pass
+ except socket.error, err:
+ # Exception occurs if timeout > underlying network timeout. Wait and retry.
+ if type(err.args) != tuple or err[0] != errno.ETIMEDOUT:
+ raise
+ else:
+ s.close()
+ return True
+ return False
[6/7] samza git commit: SAMZA-574: fix javadoc syntax error in
samza-log4j
Posted by cr...@apache.org.
SAMZA-574: fix javadoc syntax error in samza-log4j
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/77359feb
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/77359feb
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/77359feb
Branch: refs/heads/samza-sql
Commit: 77359feb83beec7c7ca66f9f1553565e3c826983
Parents: 11082d3
Author: Yan Fang <ya...@gmail.com>
Authored: Fri Feb 20 11:43:46 2015 -0800
Committer: Yan Fang <ya...@gmail.com>
Committed: Fri Feb 20 11:43:46 2015 -0800
----------------------------------------------------------------------
.../samza/logging/log4j/serializers/LoggingEventStringSerde.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/77359feb/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
index f9a0960..8d8f5e8 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
@@ -28,7 +28,7 @@ import org.apache.samza.serializers.Serde;
/**
* A serializer for LoggingEvent. It provides two methods. {@link #toBytes(LoggingEvent object)} serializes
- * the {@link @LoggingEvent}'s messages into bytes. {@link #fromBytes(byte[] bytes)} will creates a new
+ * the {@link org.apache.log4j.spi.LoggingEvent}'s messages into bytes. {@link #fromBytes(byte[] bytes)} will creates a new
* LoggingEvent based on the messages, which is deserialized from the bytes.
*/
public class LoggingEventStringSerde implements Serde<LoggingEvent> {
@@ -49,7 +49,7 @@ public class LoggingEventStringSerde implements Serde<LoggingEvent> {
}
/**
- * Convert bytes to a {@link LoggingEvent}. This LoggingEvent uses logging
+ * Convert bytes to a {@link org.apache.log4j.spi.LoggingEvent}. This LoggingEvent uses logging
* information of the {@link LoggingEventStringSerde}, which includes log
* name, log category and log level.
*
[4/7] samza git commit: SAMZA-554: Simplify serde configuration by
providing default serde names
Posted by cr...@apache.org.
SAMZA-554: Simplify serde configuration by providing default serde names
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9ea3a526
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9ea3a526
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9ea3a526
Branch: refs/heads/samza-sql
Commit: 9ea3a526a9de0b4bc74f476bd6d565cd192da3d0
Parents: 41c74b9
Author: Yan Fang <ya...@gmail.com>
Authored: Wed Feb 18 13:23:17 2015 -0800
Committer: Yan Fang <ya...@gmail.com>
Committed: Wed Feb 18 13:23:17 2015 -0800
----------------------------------------------------------------------
.../apache/samza/container/SamzaContainer.scala | 27 ++++++++++++++++++--
.../samza/container/TestSamzaContainer.scala | 24 +++++++++++++++++
2 files changed, 49 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/9ea3a526/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index ac6e24f..2fc6c65 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -60,6 +60,7 @@ import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel}
import org.apache.samza.serializers.model.SamzaObjectMapper
import org.apache.samza.config.JobConfig.Config2Job
import java.lang.Thread.UncaughtExceptionHandler
+import org.apache.samza.serializers._
object SamzaContainer extends Logging {
def main(args: Array[String]) {
@@ -107,6 +108,28 @@ object SamzaContainer extends Logging {
.readValue(Util.read(new URL(url)), classOf[JobModel])
}
+ /**
+ * A helper function which returns system's default serde according to the
+ * serde name. If not found, throw exception.
+ */
+ def defaultSerdesFromSerdeName(serdeName: String, exceptionSystemName: String, config: Config) = {
+ info("looking for default serdes")
+ def getSerde(serdeFactory: String) = {
+ Util.getObj[SerdeFactory[Object]](serdeFactory).getSerde(serdeName, config)
+ }
+ val serde = serdeName match {
+ case "byte" => getSerde(classOf[ByteSerdeFactory].getCanonicalName)
+ case "integer" => getSerde(classOf[IntegerSerdeFactory].getCanonicalName)
+ case "json" => getSerde(classOf[JsonSerdeFactory].getCanonicalName)
+ case "long" => getSerde(classOf[LongSerdeFactory].getCanonicalName)
+ case "serializable" => getSerde(classOf[SerializableSerdeFactory[java.io.Serializable]].getCanonicalName)
+ case "string" => getSerde(classOf[StringSerdeFactory].getCanonicalName)
+ case _ => throw new SamzaException("Serde %s for system %s does not exist in configuration." format (serdeName, exceptionSystemName))
+ }
+ info("use default serde %s for %s" format (serde, serdeName))
+ serde
+ }
+
def apply(containerModel: ContainerModel, config: Config) = {
val containerId = containerModel.getContainerId
val containerName = "samza-container-%s" format containerId
@@ -222,7 +245,7 @@ object SamzaContainer extends Logging {
.filter(getSerdeName(_).isDefined)
.map(systemName => {
val serdeName = getSerdeName(systemName).get
- val serde = serdes.getOrElse(serdeName, throw new SamzaException("Serde %s for system %s does not exist in configuration." format (serdeName, systemName)))
+ val serde = serdes.getOrElse(serdeName, defaultSerdesFromSerdeName(serdeName, systemName, config))
(systemName, serde)
}).toMap
}
@@ -235,7 +258,7 @@ object SamzaContainer extends Logging {
.filter(systemStream => getSerdeName(systemStream).isDefined)
.map(systemStream => {
val serdeName = getSerdeName(systemStream).get
- val serde = serdes.getOrElse(serdeName, throw new SamzaException("Serde %s for system %s does not exist in configuration." format (serdeName, systemStream)))
+ val serde = serdes.getOrElse(serdeName, defaultSerdesFromSerdeName(serdeName, systemStream.toString, config))
(systemStream, serde)
}).toMap
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9ea3a526/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 19ceeaa..81742bc 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -52,6 +52,8 @@ import org.junit.Assert._
import org.junit.Test
import org.scalatest.junit.AssertionsForJUnit
import java.lang.Thread.UncaughtExceptionHandler
+import org.apache.samza.serializers._
+import org.apache.samza.SamzaException
class TestSamzaContainer extends AssertionsForJUnit {
@Test
@@ -168,4 +170,26 @@ class TestSamzaContainer extends AssertionsForJUnit {
t.join
assertTrue(caughtException)
}
+
+ @Test
+ def testDefaultSerdesFromSerdeName {
+ import SamzaContainer._
+ val config = new MapConfig
+ assertTrue(defaultSerdesFromSerdeName("byte", "testSystemException", config).isInstanceOf[ByteSerde])
+ assertTrue(defaultSerdesFromSerdeName("integer", "testSystemException", config).isInstanceOf[IntegerSerde])
+ assertTrue(defaultSerdesFromSerdeName("json", "testSystemException", config).isInstanceOf[JsonSerde])
+ assertTrue(defaultSerdesFromSerdeName("long", "testSystemException", config).isInstanceOf[LongSerde])
+ assertTrue(defaultSerdesFromSerdeName("serializable", "testSystemException", config).isInstanceOf[SerializableSerde[java.io.Serializable @unchecked]])
+ assertTrue(defaultSerdesFromSerdeName("string", "testSystemException", config).isInstanceOf[StringSerde])
+
+ // throw SamzaException if can not find the correct serde
+ var throwSamzaException = false
+ try {
+ defaultSerdesFromSerdeName("otherName", "testSystemException", config)
+ } catch {
+ case e: SamzaException => throwSamzaException = true
+ case _: Exception =>
+ }
+ assertTrue(throwSamzaException)
+ }
}
[2/7] samza git commit: SAMZA-507;
shutdown container when threads fail with uncaught exceptions
Posted by cr...@apache.org.
SAMZA-507; shutdown container when threads fail with uncaught exceptions
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c7ac2637
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c7ac2637
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c7ac2637
Branch: refs/heads/samza-sql
Commit: c7ac26377debacbb94f9c5aac951827895c136a2
Parents: 5e32a1b
Author: Chris Riccomini <cr...@apache.org>
Authored: Fri Feb 13 13:12:14 2015 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Fri Feb 13 13:12:14 2015 -0800
----------------------------------------------------------------------
.../apache/samza/container/SamzaContainer.scala | 10 ++++--
.../SamzaContainerExceptionHandler.scala | 34 ++++++++++++++++++
.../apache/samza/container/TaskInstance.scala | 4 ++-
.../samza/container/TestSamzaContainer.scala | 26 +++++++++++++-
.../TestSamzaContainerExceptionHandler.scala | 36 ++++++++++++++++++++
5 files changed, 106 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/c7ac2637/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 8a6d865..ac6e24f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -59,13 +59,19 @@ import java.net.URL
import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel}
import org.apache.samza.serializers.model.SamzaObjectMapper
import org.apache.samza.config.JobConfig.Config2Job
+import java.lang.Thread.UncaughtExceptionHandler
object SamzaContainer extends Logging {
def main(args: Array[String]) {
- safeMain(() => new JmxServer)
+ safeMain(() => new JmxServer, new SamzaContainerExceptionHandler(() => System.exit(1)))
}
- def safeMain(newJmxServer: () => JmxServer) {
+ def safeMain(
+ newJmxServer: () => JmxServer,
+ exceptionHandler: UncaughtExceptionHandler = null) {
+ if (exceptionHandler != null) {
+ Thread.setDefaultUncaughtExceptionHandler(exceptionHandler)
+ }
putMDC("containerName", "samza-container-" + System.getenv(ShellCommandConfig.ENV_CONTAINER_ID))
// Break out the main method to make the JmxServer injectable so we can
// validate that we don't leak JMX non-daemon threads if we have an
http://git-wip-us.apache.org/repos/asf/samza/blob/c7ac2637/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala
new file mode 100644
index 0000000..bbb094c
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container
+
+import java.lang.Thread.UncaughtExceptionHandler
+import org.apache.samza.util.Logging
+
+/**
+ * An UncaughtExceptionHandler that simply shuts down when any thread throws
+ * an uncaught exception.
+ */
+class SamzaContainerExceptionHandler(exit: () => Unit) extends UncaughtExceptionHandler with Logging {
+ def uncaughtException(t: Thread, e: Throwable) {
+ error("Uncaught exception in thread (name=%s). Exiting process now.".format(t.getName), e)
+ exit()
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/c7ac2637/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 327299b..a583ff9 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -151,7 +151,9 @@ class TaskInstance(
metrics.commits.inc
- storageManager.flush
+ if (storageManager != null) {
+ storageManager.flush
+ }
trace("Flushing producers for taskName: %s" format taskName)
http://git-wip-us.apache.org/repos/asf/samza/blob/c7ac2637/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index acded7d..19ceeaa 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -20,7 +20,6 @@
package org.apache.samza.container
import scala.collection.JavaConversions._
-
import org.apache.samza.Partition
import org.apache.samza.config.Config
import org.apache.samza.config.MapConfig
@@ -52,6 +51,7 @@ import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
import org.junit.Assert._
import org.junit.Test
import org.scalatest.junit.AssertionsForJUnit
+import java.lang.Thread.UncaughtExceptionHandler
class TestSamzaContainer extends AssertionsForJUnit {
@Test
@@ -144,4 +144,28 @@ class TestSamzaContainer extends AssertionsForJUnit {
}
assertTrue(task.wasShutdown)
}
+
+ @Test
+ def testUncaughtExceptionHandler {
+ var caughtException = false
+ val exceptionHandler = new UncaughtExceptionHandler {
+ def uncaughtException(t: Thread, e: Throwable) {
+ caughtException = true
+ }
+ }
+ try {
+ SamzaContainer.safeMain(() => null, exceptionHandler)
+ } catch {
+ case _: Exception =>
+ // Expect some random exception from SamzaContainer because we haven't
+ // set any environment variables for container ID, etc.
+ }
+ assertFalse(caughtException)
+ val t = new Thread(new Runnable {
+ def run = throw new RuntimeException("Uncaught exception in another thread. Catch this.")
+ })
+ t.start
+ t.join
+ assertTrue(caughtException)
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c7ac2637/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainerExceptionHandler.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainerExceptionHandler.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainerExceptionHandler.scala
new file mode 100644
index 0000000..b1d100c
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainerExceptionHandler.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container
+
+import org.junit.Test
+import org.junit.Assert._
+import org.junit.Before
+import org.apache.samza.SamzaException
+import org.junit.After
+
+class TestSamzaContainerExceptionHandler {
+ @Test
+ def testShutdownProcess {
+ var exitCalled = false
+ val exceptionHandler = new SamzaContainerExceptionHandler(() => exitCalled = true)
+ exceptionHandler.uncaughtException(Thread.currentThread, new SamzaException)
+ assertTrue(exitCalled)
+ }
+}
[5/7] samza git commit: SAMZA-479: make StreamAppender pluggable for
different log formats
Posted by cr...@apache.org.
SAMZA-479: make StreamAppender pluggable for different log formats
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/11082d34
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/11082d34
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/11082d34
Branch: refs/heads/samza-sql
Commit: 11082d344d3059039fd285c0ef6752de02546977
Parents: 9ea3a52
Author: Yan Fang <ya...@gmail.com>
Authored: Wed Feb 18 18:11:37 2015 -0800
Committer: Yan Fang <ya...@gmail.com>
Committed: Wed Feb 18 18:11:37 2015 -0800
----------------------------------------------------------------------
.../apache/samza/config/Log4jSystemConfig.java | 27 ++++++++
.../samza/logging/log4j/StreamAppender.java | 45 +++++++++++-
.../serializers/LoggingEventStringSerde.java | 72 ++++++++++++++++++++
.../LoggingEventStringSerdeFactory.java | 32 +++++++++
.../samza/config/TestLog4jSystemConfig.java | 23 +++++++
.../samza/logging/log4j/TestStreamAppender.java | 3 +
.../TestLoggingEventStringSerde.java | 44 ++++++++++++
7 files changed, 245 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
index 659e3b6..107ddf0 100644
--- a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
+++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
@@ -22,6 +22,8 @@ package org.apache.samza.config;
import java.util.ArrayList;
import java.util.Map;
+import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory;
+
/**
* This class contains the methods for getting properties that are needed by the
* StreamAppender.
@@ -74,6 +76,31 @@ public class Log4jSystemConfig {
}
/**
+ * get the class name according to the serde name. If the serde name is "log4j" and
+ * the serde class is not configured, will use the default {@link LoggingEventStringSerdeFactory}
+ *
+ * @param name serde name
+ * @return serde factory name
+ */
+ public String getSerdeClass(String name) {
+ String className = getValue(String.format(SerializerConfig.SERDE(), name));
+ if (className == null && name.equals("log4j")) {
+ className = LoggingEventStringSerdeFactory.class.getCanonicalName();
+ }
+ return className;
+ }
+
+ public String getSystemSerdeName(String name) {
+ String systemSerdeNameConfig = String.format(SystemConfig.MSG_SERDE(), name);
+ return getValue(systemSerdeNameConfig);
+ }
+
+ public String getStreamSerdeName(String systemName, String streamName) {
+ String streamSerdeNameConfig = String.format(StreamConfig.MSG_SERDE(), systemName, streamName);
+ return getValue(streamSerdeNameConfig);
+ }
+
+ /**
* A helper method to get the value from the config. If the config does not
* contain the key, return null.
*
http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index 9a9d648..4ef3551 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -30,8 +30,12 @@ import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.Log4jSystemConfig;
import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.config.SystemConfig;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemFactory;
@@ -55,6 +59,7 @@ public class StreamAppender extends AppenderSkeleton {
private String key = null;
private String streamName = null;
private boolean isApplicationMaster = false;
+ private Serde<LoggingEvent> serde = null;
private Logger log = Logger.getLogger(StreamAppender.class);
/**
@@ -96,6 +101,8 @@ public class StreamAppender extends AppenderSkeleton {
throw new SamzaException("Please define log4j system name and factory class");
}
+ setSerde(log4jSystemConfig, systemName, streamName);
+
systemProducer = systemFactory.getProducer(systemName, config, new MetricsRegistryMap());
systemStream = new SystemStream(systemName, streamName);
systemProducer.register(SOURCE);
@@ -111,7 +118,7 @@ public class StreamAppender extends AppenderSkeleton {
try {
recursiveCall.set(true);
OutgoingMessageEnvelope outgoingMessageEnvelope =
- new OutgoingMessageEnvelope(systemStream, key.getBytes("UTF-8"), subAppend(event).getBytes("UTF-8"));
+ new OutgoingMessageEnvelope(systemStream, key.getBytes("UTF-8"), serde.toBytes(subLog(event)));
systemProducer.send(SOURCE, outgoingMessageEnvelope);
} catch (UnsupportedEncodingException e) {
throw new SamzaException("can not send the log messages", e);
@@ -129,6 +136,12 @@ public class StreamAppender extends AppenderSkeleton {
}
}
+ private LoggingEvent subLog(LoggingEvent event) {
+ return new LoggingEvent(event.getFQNOfLoggerClass(), event.getLogger(), event.getTimeStamp(),
+ event.getLevel(), subAppend(event), event.getThreadName(), event.getThrowableInformation(),
+ event.getNDC(), event.getLocationInformation(), event.getProperties());
+ }
+
@Override
public void close() {
if (!this.closed) {
@@ -182,4 +195,34 @@ public class StreamAppender extends AppenderSkeleton {
String streamName = "__samza_" + jobName + "_" + jobId + "_logs";
return streamName.replace("-", "_");
}
+
+ /**
+ * set the serde for this appender. It looks for the stream serde first, then system serde.
+ * If still can not get the serde, throws exceptions.
+ *
+ * @param log4jSystemConfig log4jSystemConfig for this appender
+ * @param systemName name of the system
+ * @param streamName name of the stream
+ */
+ private void setSerde(Log4jSystemConfig log4jSystemConfig, String systemName, String streamName) {
+ String serdeClass = null;
+ String serdeName = log4jSystemConfig.getStreamSerdeName(systemName, streamName);
+
+ if (serdeName == null) {
+ serdeName = log4jSystemConfig.getSystemSerdeName(systemName);
+ }
+
+ if (serdeName == null) {
+ throw new SamzaException("Missing serde name. Please specify the " + StreamConfig.MSG_SERDE() + " or " + SystemConfig.MSG_SERDE() + " property.");
+ }
+
+ serdeClass = log4jSystemConfig.getSerdeClass(serdeName);
+
+ if (serdeClass != null) {
+ SerdeFactory<LoggingEvent> serdeFactory = Util.<SerdeFactory<LoggingEvent>> getObj(serdeClass);
+ serde = serdeFactory.getSerde(systemName, config);
+ } else {
+ throw new SamzaException("Can not find serializers class. Please specify serializers.registry.s%.class property");
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
new file mode 100644
index 0000000..f9a0960
--- /dev/null
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j.serializers;
+
+import java.io.UnsupportedEncodingException;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.samza.SamzaException;
+import org.apache.samza.serializers.Serde;
+
+/**
+ * A serializer for LoggingEvent. It provides two methods. {@link #toBytes(LoggingEvent object)} serializes
+ * the {@link @LoggingEvent}'s messages into bytes. {@link #fromBytes(byte[] bytes)} will creates a new
+ * LoggingEvent based on the messages, which is deserialized from the bytes.
+ */
+public class LoggingEventStringSerde implements Serde<LoggingEvent> {
+ final private String ENCODING = "UTF-8";
+ final Logger logger = Logger.getLogger(LoggingEventStringSerde.class);
+
+ @Override
+ public byte[] toBytes(LoggingEvent object) {
+ byte[] bytes = null;
+ if (object != null) {
+ try {
+ bytes = object.getMessage().toString().getBytes(ENCODING);
+ } catch (UnsupportedEncodingException e) {
+ throw new SamzaException("can not be encoded to byte[]", e);
+ }
+ }
+ return bytes;
+ }
+
+ /**
+ * Convert bytes to a {@link LoggingEvent}. This LoggingEvent uses logging
+ * information of the {@link LoggingEventStringSerde}, which includes log
+ * name, log category and log level.
+ *
+ * @param bytes bytes for decoding
+ * @return LoggingEvent a new LoggingEvent
+ */
+ @Override
+ public LoggingEvent fromBytes(byte[] bytes) {
+ if (bytes == null) {
+ return null;
+ }
+ String log;
+ try {
+ log = new String(bytes, ENCODING);
+ } catch (UnsupportedEncodingException e) {
+ throw new SamzaException("can not decode to String", e);
+ }
+ return new LoggingEvent(logger.getName(), logger, logger.getLevel(), log, null);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerdeFactory.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerdeFactory.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerdeFactory.java
new file mode 100644
index 0000000..150c3e5
--- /dev/null
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerdeFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j.serializers;
+
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+
+public class LoggingEventStringSerdeFactory implements SerdeFactory<LoggingEvent> {
+ @Override
+ public Serde<LoggingEvent> getSerde(String name, Config config) {
+ return new LoggingEventStringSerde();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java b/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
index 64a1e70..16ccb45 100644
--- a/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
+++ b/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.*;
import java.util.HashMap;
import java.util.Map;
+import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -64,4 +65,26 @@ public class TestLog4jSystemConfig {
exception.expect(ConfigException.class);
log4jSystemConfig.getSystemName();
}
+
+ @Test
+ public void testGetSerdeClass() {
+ Map<String, String> map = new HashMap<String, String>();
+ Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(new MapConfig(map));
+
+ // get the default serde
+ assertEquals(LoggingEventStringSerdeFactory.class.getCanonicalName(), log4jSystemConfig.getSerdeClass("log4j"));
+ // get null
+ assertNull(log4jSystemConfig.getSerdeClass("otherName"));
+ }
+
+ @Test
+ public void testGetSerdeName() {
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("systems.mockSystem.streams.mockStream.samza.msg.serde", "streamSerde");
+ map.put("systems.mockSystem.samza.msg.serde", "systemSerde");
+ Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(new MapConfig(map));
+
+ assertEquals("streamSerde", log4jSystemConfig.getStreamSerdeName("mockSystem", "mockStream"));
+ assertEquals("systemSerde", log4jSystemConfig.getSystemSerdeName("mockSystem"));
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
index 46e4b8c..3e4ddc9 100644
--- a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
+++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
@@ -23,10 +23,12 @@ import static org.junit.Assert.*;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory;
import org.junit.Test;
public class TestStreamAppender {
@@ -64,6 +66,7 @@ public class TestStreamAppender {
Map<String, String> map = new HashMap<String, String>();
map.put("job.name", "log4jTest");
map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
+ map.put("systems.mock.samza.msg.serde", "log4j");
return new MapConfig(map);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/test/java/org/apache/samza/logging/log4j/serializers/TestLoggingEventStringSerde.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/serializers/TestLoggingEventStringSerde.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/serializers/TestLoggingEventStringSerde.java
new file mode 100644
index 0000000..4a7aa68
--- /dev/null
+++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/serializers/TestLoggingEventStringSerde.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j.serializers;
+
+import static org.junit.Assert.*;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.Test;
+
+public class TestLoggingEventStringSerde {
+
+ @Test
+ public void test() {
+ String testLog = "testing";
+ Logger logger = Logger.getLogger(TestLoggingEventStringSerde.class);
+ LoggingEvent log = new LoggingEvent(logger.getName(), logger, logger.getLevel(), testLog, null);
+ LoggingEventStringSerde loggingEventStringSerde = new LoggingEventStringSerde();
+
+ assertNull(loggingEventStringSerde.fromBytes(null));
+ assertNull(loggingEventStringSerde.toBytes(null));
+
+ assertArrayEquals(testLog.getBytes(), loggingEventStringSerde.toBytes(log));
+ // only the log messages are guaranteed to be equivalent
+ assertEquals(log.getMessage().toString(), loggingEventStringSerde.fromBytes(testLog.getBytes()).getMessage().toString());
+ }
+}
[7/7] samza git commit: Merge branch 'master' into samza-sql
Posted by cr...@apache.org.
Merge branch 'master' into samza-sql
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c8da3e3b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c8da3e3b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c8da3e3b
Branch: refs/heads/samza-sql
Commit: c8da3e3b5269a69dd3760f163a091a3b8c918f9f
Parents: 78d2fed 77359fe
Author: Chris Riccomini <cr...@apache.org>
Authored: Tue Feb 24 09:39:09 2015 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Tue Feb 24 09:39:09 2015 -0800
----------------------------------------------------------------------
.../apache/samza/container/SamzaContainer.scala | 37 ++++++++-
.../SamzaContainerExceptionHandler.scala | 34 ++++++++
.../apache/samza/container/TaskInstance.scala | 4 +-
.../samza/serializers/SerializableSerde.scala | 67 ++++++++++++++++
.../samza/container/TestSamzaContainer.scala | 50 +++++++++++-
.../TestSamzaContainerExceptionHandler.scala | 36 +++++++++
.../serializers/TestSerializableSerde.scala | 45 +++++++++++
.../apache/samza/config/Log4jSystemConfig.java | 27 +++++++
.../samza/logging/log4j/StreamAppender.java | 45 ++++++++++-
.../serializers/LoggingEventStringSerde.java | 72 +++++++++++++++++
.../LoggingEventStringSerdeFactory.java | 32 ++++++++
.../samza/config/TestLog4jSystemConfig.java | 23 ++++++
.../samza/logging/log4j/TestStreamAppender.java | 3 +
.../TestLoggingEventStringSerde.java | 44 ++++++++++
samza-shell/src/main/bash/stat-yarn-job.sh | 21 +++++
.../src/main/config/negate-number.properties | 18 +----
.../kafka-read-write-performance.properties | 35 ++++++++
.../test/integration/NegateNumberTask.java | 44 +++++++++-
.../src/main/python/configs/downloads.json | 2 +-
samza-test/src/main/python/configs/kafka.json | 22 ++---
.../python/configs/smoke-tests/smoke-tests.json | 6 --
samza-test/src/main/python/configs/tests.json | 5 ++
samza-test/src/main/python/deployment.py | 21 ++---
.../src/main/python/samza_job_yarn_deployer.py | 47 ++++++++++-
samza-test/src/main/python/tests.py | 3 +-
.../src/main/python/tests/performance_tests.py | 80 +++++++++++++++++++
samza-test/src/main/python/tests/smoke_tests.py | 83 +++++++------------
samza-test/src/main/python/tests/util.py | 84 ++++++++++++++++++++
28 files changed, 871 insertions(+), 119 deletions(-)
----------------------------------------------------------------------