You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/02/24 18:47:50 UTC

[1/7] samza git commit: SAMZA-547; add java serializable serde

Repository: samza
Updated Branches:
  refs/heads/samza-sql 78d2fedb6 -> c8da3e3b5


SAMZA-547; add java serializable serde


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5e32a1bb
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5e32a1bb
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5e32a1bb

Branch: refs/heads/samza-sql
Commit: 5e32a1bb018cb0382ab4911c31554f8889de984f
Parents: 6743df3
Author: Ruslan Khafizov <ru...@gmail.com>
Authored: Thu Feb 12 14:43:31 2015 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Thu Feb 12 14:43:31 2015 -0800

----------------------------------------------------------------------
 .../samza/serializers/SerializableSerde.scala   | 67 ++++++++++++++++++++
 .../serializers/TestSerializableSerde.scala     | 45 +++++++++++++
 2 files changed, 112 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/5e32a1bb/samza-core/src/main/scala/org/apache/samza/serializers/SerializableSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/SerializableSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/SerializableSerde.scala
new file mode 100644
index 0000000..c43f863
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/SerializableSerde.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.io.ByteArrayInputStream
+import java.io.ByteArrayOutputStream
+import java.io.ObjectInputStream
+import java.io.ObjectOutputStream
+
+import org.apache.samza.config.Config
+
+/**
+ * A serializer for Serializable
+ */
+class SerializableSerdeFactory[T <: java.io.Serializable] extends SerdeFactory[T] {
+  def getSerde(name: String, config: Config): Serde[T] =
+    new SerializableSerde[T]
+}
+
+class SerializableSerde[T <: java.io.Serializable] extends Serde[T] {
+  def toBytes(obj: T): Array[Byte] = if (obj != null) {
+    val bos = new ByteArrayOutputStream
+    val oos = new ObjectOutputStream(bos)
+
+    try {
+      oos.writeObject(obj)
+    }
+    finally {
+      oos.close()
+    }
+
+    bos.toByteArray
+  } else {
+    null
+  }
+
+  def fromBytes(bytes: Array[Byte]): T = if (bytes != null) {
+    val bis = new ByteArrayInputStream(bytes)
+    val ois = new ObjectInputStream(bis)
+
+    try {
+      ois.readObject.asInstanceOf[T]
+    }
+    finally{
+      ois.close()
+    }
+  } else {
+    null.asInstanceOf[T]
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/5e32a1bb/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala
new file mode 100644
index 0000000..8e899d0
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.junit.Assert._
+import org.junit.Test
+
+class TestSerializableSerde {
+  @Test
+  def testSerializableSerde {
+    val serde = new SerializableSerde[String]
+    assertNull(serde.toBytes(null))
+    assertNull(serde.fromBytes(null))
+    
+    val obj = "String is serializable"
+
+    // Serialized string is prefix + string itself
+    val prefix = Array(0xAC, 0xED, 0x00, 0x05, 0x74, 0x00, 0x16).map(_.toByte)
+    val expected = (prefix ++ obj.getBytes("UTF-8"))
+    
+    val bytes = serde.toBytes(obj)
+
+    assertArrayEquals(expected, bytes)
+
+    val objRoundTrip:String = serde.fromBytes(bytes)
+    assertEquals(obj, objRoundTrip)
+  }
+}


[3/7] samza git commit: SAMZA-548; add performance test for container with kafka consumer and producer

Posted by cr...@apache.org.
SAMZA-548; add performance test for container with kafka consumer and producer


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/41c74b96
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/41c74b96
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/41c74b96

Branch: refs/heads/samza-sql
Commit: 41c74b96876473acb6403e544bec7a00a04d2fa3
Parents: c7ac263
Author: Chris Riccomini <cr...@apache.org>
Authored: Fri Feb 13 17:10:03 2015 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Fri Feb 13 17:10:03 2015 -0800

----------------------------------------------------------------------
 samza-shell/src/main/bash/stat-yarn-job.sh      | 21 +++++
 .../src/main/config/negate-number.properties    | 18 +----
 .../kafka-read-write-performance.properties     | 35 ++++++++
 .../test/integration/NegateNumberTask.java      | 44 +++++++++-
 .../src/main/python/configs/downloads.json      |  2 +-
 samza-test/src/main/python/configs/kafka.json   | 22 ++---
 .../python/configs/smoke-tests/smoke-tests.json |  6 --
 samza-test/src/main/python/configs/tests.json   |  5 ++
 samza-test/src/main/python/deployment.py        | 21 ++---
 .../src/main/python/samza_job_yarn_deployer.py  | 47 ++++++++++-
 samza-test/src/main/python/tests.py             |  3 +-
 .../src/main/python/tests/performance_tests.py  | 80 +++++++++++++++++++
 samza-test/src/main/python/tests/smoke_tests.py | 83 +++++++------------
 samza-test/src/main/python/tests/util.py        | 84 ++++++++++++++++++++
 14 files changed, 359 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-shell/src/main/bash/stat-yarn-job.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/stat-yarn-job.sh b/samza-shell/src/main/bash/stat-yarn-job.sh
new file mode 100755
index 0000000..e5f6847
--- /dev/null
+++ b/samza-shell/src/main/bash/stat-yarn-job.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh org.apache.hadoop.yarn.client.cli.ApplicationCLI application -status "$@"

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/config/negate-number.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/negate-number.properties b/samza-test/src/main/config/negate-number.properties
index 379fa61..b9f898c 100644
--- a/samza-test/src/main/config/negate-number.properties
+++ b/samza-test/src/main/config/negate-number.properties
@@ -21,18 +21,12 @@ job.name=samza-negate-number
 
 # YARN
 yarn.container.count=1
-yarn.container.memory.mb=1024
 
 # Task
 task.class=org.apache.samza.test.integration.NegateNumberTask
 task.inputs=kafka.samza-test-topic
-task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
-task.checkpoint.replication.factor=1
-task.checkpoint.system=kafka
-task.lifecycle.listener.generator.class=com.linkedin.samza.task.GeneratorLifecycleListenerFactory
-task.lifecycle.listener.generator.fabric=CORP-EAT1
-task.opts=-Xmx6g
-task.command.class=org.apache.samza.job.ShellCommandBuilder
+task.max.messages=50
+task.outputs=kafka.samza-test-topic-output
 
 # Serializers
 serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
@@ -43,12 +37,4 @@ systems.kafka.samza.msg.serde=string
 systems.kafka.samza.key.serde=string
 systems.kafka.samza.offset.default=oldest
 systems.kafka.consumer.zookeeper.connect=localhost:2181/
-systems.kafka.producer.compression.type=gzip
 systems.kafka.producer.bootstrap.servers=localhost:9092
-systems.kafka.producer.acks=1
-systems.kafka.producer.metadata.max.age.ms=86400000
-# Normally, we'd set this much higher, but we want things to look snappy in the demo.
-systems.kafka.producer.buffer.memory=1000000
-
-# negate-number
-streams.samza-test-topic.consumer.reset.offset=true

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/config/perf/kafka-read-write-performance.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/perf/kafka-read-write-performance.properties b/samza-test/src/main/config/perf/kafka-read-write-performance.properties
new file mode 100644
index 0000000..122b14a
--- /dev/null
+++ b/samza-test/src/main/config/perf/kafka-read-write-performance.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=kafka-read-write-performance
+
+# YARN
+yarn.container.count=1
+
+# Task
+task.class=org.apache.samza.test.performance.TestPerformanceTask
+task.inputs=kafka.kafka-read-write-performance-input
+task.outputs=kafka.kafka-read-write-performance-output
+task.max.messages=1000000
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.offset.default=oldest
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
index 782e9f4..617cea6 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
@@ -19,22 +19,58 @@
 
 package org.apache.samza.test.integration;
 
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.InitableTask;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.TaskCoordinator.RequestScope;
+import org.apache.samza.util.Util;
 
-/*
- * A simple test job that reads strings, converts them to integers, multiplies 
+/**
+ * A simple test job that reads strings, converts them to integers, multiplies
  * by -1, and outputs to "samza-test-topic-output" stream.
  */
-public class NegateNumberTask implements StreamTask {
+public class NegateNumberTask implements StreamTask, InitableTask {
+  /**
+   * How many messages the all tasks in a single container have processed.
+   */
+  private static int messagesProcessed = 0;
+
+  /**
+   * How many messages to process before shutting down.
+   */
+  private int maxMessages;
+
+  /**
+   * The SystemStream to send negated numbers to.
+   */
+  private SystemStream outputSystemStream;
+
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    maxMessages = config.getInt("task.max.messages", 50);
+    String outputSystemStreamString = config.get("task.outputs", null);
+    if (outputSystemStreamString == null) {
+      throw new ConfigException("Missing required configuration: task.outputs");
+    }
+    outputSystemStream = Util.getSystemStreamFromNames(outputSystemStreamString);
+  }
+
+  @Override
   public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+    messagesProcessed += 1;
     String input = (String) envelope.getMessage();
     Integer number = Integer.valueOf(input);
     Integer output = number.intValue() * -1;
-    collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "samza-test-topic-output"), output.toString()));
+    collector.send(new OutgoingMessageEnvelope(outputSystemStream, output.toString()));
+    if (messagesProcessed >= maxMessages) {
+      coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/configs/downloads.json
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/configs/downloads.json b/samza-test/src/main/python/configs/downloads.json
index 8ded306..a75756f 100644
--- a/samza-test/src/main/python/configs/downloads.json
+++ b/samza-test/src/main/python/configs/downloads.json
@@ -1,5 +1,5 @@
 {
-  "url_kafka": "http://www.us.apache.org/dist/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz",
+  "url_kafka": "http://www.us.apache.org/dist/kafka/0.8.2.0/kafka_2.9.2-0.8.2.0.tgz",
   "url_zookeeper": "http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz",
   "url_hadoop": "https://archive.apache.org/dist/hadoop/common/hadoop-2.4.0/hadoop-2.4.0.tar.gz"
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/configs/kafka.json
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/configs/kafka.json b/samza-test/src/main/python/configs/kafka.json
index 9a7af19..ab2f346 100644
--- a/samza-test/src/main/python/configs/kafka.json
+++ b/samza-test/src/main/python/configs/kafka.json
@@ -3,21 +3,21 @@
     "kafka_instance_0": "localhost"
   },
   "kafka_port": 9092,
-  "kafka_start_cmd": "kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh -daemon kafka_2.9.2-0.8.1.1/config/server.properties",
-  "kafka_stop_cmd": "kafka_2.9.2-0.8.1.1/bin/kafka-server-stop.sh",
+  "kafka_start_cmd": "kafka_2.9.2-0.8.2.0/bin/kafka-server-start.sh -daemon kafka_2.9.2-0.8.2.0/config/server.properties",
+  "kafka_stop_cmd": "kafka_2.9.2-0.8.2.0/bin/kafka-server-stop.sh",
   "kafka_install_path": "deploy/kafka",
-  "kafka_executable": "kafka_2.9.2-0.8.1.1.tgz",
+  "kafka_executable": "kafka_2.9.2-0.8.2.0.tgz",
   "kafka_post_install_cmds": [
-    "sed -i.bak 's/SIGINT/SIGTERM/g' kafka_2.9.2-0.8.1.1/bin/kafka-server-stop.sh",
-    "sed -i.bak 's/^num\\.partitions *=.*/num.partitions=1/' kafka_2.9.2-0.8.1.1/config/server.properties",
-    "sed -i.bak 's/.*log.dirs.*/log.dirs=data/g' kafka_2.9.2-0.8.1.1/config/server.properties"
+    "sed -i.bak 's/SIGINT/SIGTERM/g' kafka_2.9.2-0.8.2.0/bin/kafka-server-stop.sh",
+    "sed -i.bak 's/^num\\.partitions *=.*/num.partitions=1/' kafka_2.9.2-0.8.2.0/config/server.properties",
+    "sed -i.bak 's/.*log.dirs.*/log.dirs=data/g' kafka_2.9.2-0.8.2.0/config/server.properties"
   ],
   "kafka_logs": [
     "log-cleaner.log",
-    "kafka_2.9.2-0.8.1.1/logs/controller.log",
-    "kafka_2.9.2-0.8.1.1/logs/kafka-request.log",
-    "kafka_2.9.2-0.8.1.1/logs/kafkaServer-gc.log",
-    "kafka_2.9.2-0.8.1.1/logs/server.log",
-    "kafka_2.9.2-0.8.1.1/logs/state-change.log"
+    "kafka_2.9.2-0.8.2.0/logs/controller.log",
+    "kafka_2.9.2-0.8.2.0/logs/kafka-request.log",
+    "kafka_2.9.2-0.8.2.0/logs/kafkaServer-gc.log",
+    "kafka_2.9.2-0.8.2.0/logs/server.log",
+    "kafka_2.9.2-0.8.2.0/logs/state-change.log"
   ]
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/configs/smoke-tests/smoke-tests.json
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/configs/smoke-tests/smoke-tests.json b/samza-test/src/main/python/configs/smoke-tests/smoke-tests.json
deleted file mode 100644
index 65f8568..0000000
--- a/samza-test/src/main/python/configs/smoke-tests/smoke-tests.json
+++ /dev/null
@@ -1,6 +0,0 @@
-{
-  "samza_executable": "samza-test_2.10-0.9.0-SNAPSHOT.tgz",
-  "samza_install_path": "deploy/smoke_tests",
-  "samza_config_factory": "org.apache.samza.config.factories.PropertiesConfigFactory",
-  "samza_config_file": "config/negate-number.properties"
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/configs/tests.json
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/configs/tests.json b/samza-test/src/main/python/configs/tests.json
new file mode 100644
index 0000000..5251af9
--- /dev/null
+++ b/samza-test/src/main/python/configs/tests.json
@@ -0,0 +1,5 @@
+{
+  "samza_executable": "samza-test_2.10-0.9.0-SNAPSHOT.tgz",
+  "samza_install_path": "deploy/smoke_tests",
+  "samza_config_factory": "org.apache.samza.config.factories.PropertiesConfigFactory"
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/deployment.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/deployment.py b/samza-test/src/main/python/deployment.py
index a0e1481..89ba728 100644
--- a/samza-test/src/main/python/deployment.py
+++ b/samza-test/src/main/python/deployment.py
@@ -76,36 +76,25 @@ def setup_suite():
         'hostname': host
       })
 
-  # Start the Samza jobs.
+  # Setup Samza job deployer.
   samza_job_deployer = SamzaJobYarnDeployer({
+    'config_factory': c('samza_config_factory'),
     'yarn_site_template': c('yarn_site_template'),
     'yarn_driver_configs': c('yarn_driver_configs'),
     'yarn_nm_hosts': c('yarn_nm_hosts').values(),
     'install_path': samza_install_path,
   })
 
-  samza_job_deployer.install('smoke_tests', {
+  samza_job_deployer.install('tests', {
     'executable': c('samza_executable'),
   })
 
-  samza_job_deployer.start('negate_number', {
-    'package_id': 'smoke_tests',
-    'config_factory': c('samza_config_factory'),
-    'config_file': c('samza_config_file'),
-    'install_path': samza_install_path,
-  })
+  runtime.set_deployer('samza_job_deployer', samza_job_deployer)
 
 def teardown_suite():
-  # Stop the samza jobs.
-  samza_job_deployer.stop('negate_number', {
-    'package_id': 'smoke_tests',
-    'install_path': samza_install_path,
-  })
-
-  samza_job_deployer.uninstall('smoke_tests')
+  samza_job_deployer.uninstall('tests')
 
   # Undeploy everything.
   for name, deployer in deployers.iteritems():
     for instance, host in c(name + '_hosts').iteritems():
       deployer.undeploy(instance)
-

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/samza_job_yarn_deployer.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/samza_job_yarn_deployer.py b/samza-test/src/main/python/samza_job_yarn_deployer.py
index e18bc58..38635ca 100644
--- a/samza-test/src/main/python/samza_job_yarn_deployer.py
+++ b/samza-test/src/main/python/samza_job_yarn_deployer.py
@@ -39,7 +39,7 @@ class SamzaJobYarnDeployer(Deployer):
     to start and stop Samza jobs in a YARN grid.
 
     param: configs -- Map of config key/values pairs. These configs will be used
-    as a default whenever overrides are not provided in the methods (intall, 
+    as a default whenever overrides are not provided in the methods (install, 
     start, stop, etc) below.
     """
     logging.getLogger("paramiko").setLevel(logging.ERROR)
@@ -173,6 +173,47 @@ class SamzaJobYarnDeployer(Deployer):
       p.wait()
       assert p.returncode == 0, "Command returned non-zero exit code ({0}): {1}".format(p.returncode, command)
 
+  def await(self, job_id, configs={}):
+    """
+    Waits for a Samza job to finish using bin/stat-yarn-job.sh. A job is 
+    finished when its "Final State" is not "UNDEFINED".
+
+    param: job_id -- A unique ID used to idenitfy a Samza job.
+    param: configs -- Map of config key/values pairs. Valid keys include:
+
+    package_id: The package_id for the package that contains the code for job_id.
+    Usually, the package_id refers to the .tgz job tarball that contains the
+    code necessary to run job_id.
+    """
+    configs = self._get_merged_configs(configs)
+    self._validate_configs(configs, ['package_id'])
+
+    # Get configs.
+    package_id = configs.get('package_id')
+
+    # Get the application_id for the job.
+    application_id = self.app_ids.get(job_id)
+
+    # Stat the job, if it's been started, or WARN and return if it's hasn't.
+    final_state = 'UNDEFINED'
+    if not application_id:
+      logger.warn("Can't stat a job that was never started: {0}".format(job_id))
+    else:
+      command = "{0} {1}".format(os.path.join(package_id, "bin/stat-yarn-job.sh"), application_id)
+      env = self._get_env_vars(package_id)
+      while final_state == 'UNDEFINED':
+        p = Popen(command.split(' '), stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env)
+        output, err = p.communicate()
+        logger.debug("Output from run-job.sh:\nstdout: {0}\nstderr: {1}".format(output, err))
+        assert p.returncode == 0, "Command ({0}) returned non-zero exit code ({1}).\nstdout: {2}\nstderr: {3}".format(command, p.returncode, output, err)
+
+        # Check the final state for the job.
+        regex = r'.*Final.State . (\w*)'
+        match = re.match(regex, output.replace("\n", ' '))
+        final_state = match.group(1)
+        logger.debug("Got final state {0} for job_id {1}.".format(final_state, job_id))
+    return final_state
+
   def uninstall(self, package_id, configs={}):
     """
     Removes the install path for package_id from all remote hosts that it's been
@@ -201,6 +242,10 @@ class SamzaJobYarnDeployer(Deployer):
 
   # TODO we should implement the below helper methods over time, as we need them.
 
+  def get_processes(self):
+    # TODO raise NotImplementedError
+    return []
+
   def get_pid(self, container_id, configs={}):
     raise NotImplementedError
 

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/tests.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/tests.py b/samza-test/src/main/python/tests.py
index dae414e..df64e23 100644
--- a/samza-test/src/main/python/tests.py
+++ b/samza-test/src/main/python/tests.py
@@ -24,6 +24,7 @@ test = {
   'perf_code': os.path.join(dir, 'perf.py'),
   'configs_directory': os.path.join(dir, 'configs'),
   'test_code': [
-    os.path.join(dir, 'tests', 'smoke_tests.py')
+    os.path.join(dir, 'tests', 'smoke_tests.py'),
+    os.path.join(dir, 'tests', 'performance_tests.py'),
   ],
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/tests/performance_tests.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/tests/performance_tests.py b/samza-test/src/main/python/tests/performance_tests.py
new file mode 100644
index 0000000..a97717f
--- /dev/null
+++ b/samza-test/src/main/python/tests/performance_tests.py
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import util
+import logging
+import zopkio.runtime as runtime
+from kafka import SimpleProducer, SimpleConsumer
+
+logger = logging.getLogger(__name__)
+
+JOB_ID = 'kafka-read-write-performance'
+PACKAGE_ID = 'tests'
+CONFIG_FILE = 'config/perf/kafka-read-write-performance.properties'
+TEST_INPUT_TOPIC = 'kafka-read-write-performance-input'
+TEST_OUTPUT_TOPIC = 'kafka-read-write-performance-output'
+NUM_MESSAGES = 1000000
+MESSAGE = 'a' * 200
+
+def test_kafka_read_write_performance():
+  """
+  Runs a Samza job that reads from Kafka, and writes back out to it. The 
+  writes/sec for the job is logged to the job's container.
+  """
+  _load_data()
+  util.start_job(PACKAGE_ID, JOB_ID, CONFIG_FILE)
+  util.await_job(PACKAGE_ID, JOB_ID)
+
+def validate_kafka_read_write_performance():
+  """
+  Validates that all messages were sent to the output topic.
+  """
+  logger.info('Running validate_kafka_read_write_performance')
+  kafka = util.get_kafka_client()
+  kafka.ensure_topic_exists(TEST_OUTPUT_TOPIC)
+  consumer = SimpleConsumer(
+    kafka, 
+    'samza-test-group', 
+    TEST_OUTPUT_TOPIC,
+    fetch_size_bytes=1000000,
+    buffer_size=32768,
+    max_buffer_size=None)
+  # wait 5 minutes to get all million messages
+  messages = consumer.get_messages(count=NUM_MESSAGES, block=True, timeout=300)
+  message_count = len(messages)
+  assert NUM_MESSAGES == message_count, 'Expected {0} lines, but found {1}'.format(NUM_MESSAGES, message_count)
+  kafka.close()
+
+def _load_data():
+  """
+  Sends 10 million messages to kafka-read-write-performance-input.
+  """
+  logger.info('Running test_kafka_read_write_performance')
+  kafka = util.get_kafka_client()
+  kafka.ensure_topic_exists(TEST_INPUT_TOPIC)
+  producer = SimpleProducer(
+    kafka,
+    req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT,
+    ack_timeout=30000,
+    batch_send=True,
+    batch_send_every_n=200)
+  logger.info('Loading {0} test messages.'.format(NUM_MESSAGES))
+  for i in range(0, NUM_MESSAGES):
+    if i % 100000 == 0:
+      logger.info('Loaded {0} messages.'.format(i))
+    producer.send_messages(TEST_INPUT_TOPIC, MESSAGE)
+  kafka.close()

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/tests/smoke_tests.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/tests/smoke_tests.py b/samza-test/src/main/python/tests/smoke_tests.py
index 7aec4e0..53d5fa9 100644
--- a/samza-test/src/main/python/tests/smoke_tests.py
+++ b/samza-test/src/main/python/tests/smoke_tests.py
@@ -15,37 +15,29 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import os
-import time
+import util
 import logging
-import socket
-import errno
-from kafka import KafkaClient, SimpleProducer, SimpleConsumer
 import zopkio.runtime as runtime
+from kafka import SimpleProducer, SimpleConsumer
 
 logger = logging.getLogger(__name__)
 
-CWD = os.path.dirname(os.path.abspath(__file__))
-HOME_DIR = os.path.join(CWD, os.pardir)
-DATA_DIR = os.path.join(HOME_DIR, 'data')
-TEST_TOPIC = 'samza-test-topic'
+DEPLOYER = 'samza_job_deployer'
+JOB_ID = 'negate_number'
+PACKAGE_ID = 'tests'
+CONFIG_FILE = 'config/negate-number.properties'
+TEST_INPUT_TOPIC = 'samza-test-topic'
 TEST_OUTPUT_TOPIC = 'samza-test-topic-output'
 NUM_MESSAGES = 50
 
 def test_samza_job():
   """
-  Sends 50 messages (1 .. 50) to samza-test-topic.
+  Runs a job that reads converts input strings to integers, negates the 
+  integer, and outputs to a Kafka topic.
   """
-  logger.info('Running test_samza_job')
-  kafka = _get_kafka_client()
-  kafka.ensure_topic_exists(TEST_TOPIC)
-  producer = SimpleProducer(kafka,
-    async=False,
-    req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT,
-    ack_timeout=30000)
-  for i in range(1, NUM_MESSAGES + 1):
-    producer.send_messages(TEST_TOPIC, str(i))
-  kafka.close()
+  _load_data()
+  util.start_job(PACKAGE_ID, JOB_ID, CONFIG_FILE)
+  util.await_job(PACKAGE_ID, JOB_ID)
 
 def validate_samza_job():
   """
@@ -53,49 +45,28 @@ def validate_samza_job():
   samza-test-topic-output.
   """
   logger.info('Running validate_samza_job')
-  kafka = _get_kafka_client()
+  kafka = util.get_kafka_client()
   kafka.ensure_topic_exists(TEST_OUTPUT_TOPIC)
   consumer = SimpleConsumer(kafka, 'samza-test-group', TEST_OUTPUT_TOPIC)
-  messages = consumer.get_messages(count=NUM_MESSAGES, block=True, timeout=60)
+  messages = consumer.get_messages(count=NUM_MESSAGES, block=True, timeout=300)
   message_count = len(messages)
   assert NUM_MESSAGES == message_count, 'Expected {0} lines, but found {1}'.format(NUM_MESSAGES, message_count)
   for message in map(lambda m: m.message.value, messages):
     assert int(message) < 0 , 'Expected negative integer but received {0}'.format(message)
   kafka.close()
 
-def _get_kafka_client(num_retries=20, retry_sleep=1):
-  """
-  Returns a KafkaClient based off of the kafka_hosts and kafka_port configs set 
-  in the active runtime.
-  """
-  kafka_hosts = runtime.get_active_config('kafka_hosts').values()
-  kafka_port = runtime.get_active_config('kafka_port')
-  assert len(kafka_hosts) > 0, 'Missing required configuration: kafka_hosts'
-  connect_string = ','.join(map(lambda h: h + ':{0},'.format(kafka_port), kafka_hosts)).rstrip(',')
-  # wait for at least one broker to come up
-  if not _wait_for_server(kafka_hosts[0], kafka_port, 30):
-    raise Exception('Unable to connect to Kafka broker: {0}:{1}'.format(kafka_hosts[0], kafka_port))
-  return KafkaClient(connect_string)
-
-def _wait_for_server(host, port, timeout=5, retries=12):
+def _load_data():
   """
-  Keep trying to connect to a host port until the retry count has been reached.
+  Sends 50 messages (1 .. 50) to samza-test-topic.
   """
-  s = socket.socket()
-
-  for i in range(retries):
-    try:
-      s.settimeout(timeout)
-      s.connect((host, port))
-    except socket.timeout, err:
-      # Exception occurs if timeout is set. Wait and retry.
-      pass
-    except socket.error, err:
-      # Exception occurs if timeout > underlying network timeout. Wait and retry.
-      if type(err.args) != tuple or err[0] != errno.ETIMEDOUT:
-        raise
-    else:
-      s.close()
-      return True
-  return False
-
+  logger.info('Running test_samza_job')
+  kafka = util.get_kafka_client()
+  kafka.ensure_topic_exists(TEST_INPUT_TOPIC)
+  producer = SimpleProducer(
+    kafka,
+    async=False,
+    req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT,
+    ack_timeout=30000)
+  for i in range(1, NUM_MESSAGES + 1):
+    producer.send_messages(TEST_INPUT_TOPIC, str(i))
+  kafka.close()

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/tests/util.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/tests/util.py b/samza-test/src/main/python/tests/util.py
new file mode 100644
index 0000000..a0ed671
--- /dev/null
+++ b/samza-test/src/main/python/tests/util.py
@@ -0,0 +1,84 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import socket
+import errno
+import zopkio.runtime as runtime
+from kafka import KafkaClient, SimpleProducer, SimpleConsumer
+from zopkio.runtime import get_active_config as c
+
+logger = logging.getLogger(__name__)
+
+DEPLOYER = 'samza_job_deployer'
+
+def start_job(package_id, job_id, config_file):
+  """
+  Start a Samza job.
+  """
+  logger.info('Starting {0}.{1}'.format(package_id, job_id))
+  samza_job_deployer = runtime.get_deployer(DEPLOYER)
+  samza_job_deployer.start(job_id, {
+    'package_id': package_id,
+    'config_file': config_file,
+  })
+
+def await_job(package_id, job_id):
+  """
+  Wait for a Samza job to finish.
+  """
+  logger.info('Awaiting {0}.{1}'.format(package_id, job_id))
+  samza_job_deployer = runtime.get_deployer(DEPLOYER)
+  samza_job_deployer.await(job_id, {
+    'package_id': package_id,
+  })
+
+def get_kafka_client(num_retries=20, retry_sleep=1):
+  """
+  Returns a KafkaClient based off of the kafka_hosts and kafka_port configs set 
+  in the active runtime.
+  """
+  kafka_hosts = runtime.get_active_config('kafka_hosts').values()
+  kafka_port = runtime.get_active_config('kafka_port')
+  assert len(kafka_hosts) > 0, 'Missing required configuration: kafka_hosts'
+  connect_string = ','.join(map(lambda h: h + ':{0},'.format(kafka_port), kafka_hosts)).rstrip(',')
+  # wait for at least one broker to come up
+  if not wait_for_server(kafka_hosts[0], kafka_port, 30):
+    raise Exception('Unable to connect to Kafka broker: {0}:{1}'.format(kafka_hosts[0], kafka_port))
+  return KafkaClient(connect_string)
+
+def wait_for_server(host, port, timeout=5, retries=12):
+  """
+  Keep trying to connect to a host port until the retry count has been reached.
+  """
+  s = socket.socket()
+
+  for i in range(retries):
+    try:
+      s.settimeout(timeout)
+      s.connect((host, port))
+    except socket.timeout, err:
+      # Exception occurs if timeout is set. Wait and retry.
+      pass
+    except socket.error, err:
+      # Exception occurs if timeout > underlying network timeout. Wait and retry.
+      if type(err.args) != tuple or err[0] != errno.ETIMEDOUT:
+        raise
+    else:
+      s.close()
+      return True
+  return False


[6/7] samza git commit: SAMZA-574: fix javadoc syntax error in samza-log4j

Posted by cr...@apache.org.
SAMZA-574: fix javadoc syntax error in samza-log4j


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/77359feb
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/77359feb
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/77359feb

Branch: refs/heads/samza-sql
Commit: 77359feb83beec7c7ca66f9f1553565e3c826983
Parents: 11082d3
Author: Yan Fang <ya...@gmail.com>
Authored: Fri Feb 20 11:43:46 2015 -0800
Committer: Yan Fang <ya...@gmail.com>
Committed: Fri Feb 20 11:43:46 2015 -0800

----------------------------------------------------------------------
 .../samza/logging/log4j/serializers/LoggingEventStringSerde.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/77359feb/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
index f9a0960..8d8f5e8 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
@@ -28,7 +28,7 @@ import org.apache.samza.serializers.Serde;
 
 /**
  * A serializer for LoggingEvent. It provides two methods. {@link #toBytes(LoggingEvent object)} serializes
- * the {@link @LoggingEvent}'s messages into bytes. {@link #fromBytes(byte[] bytes)} will creates a new
+ * the {@link org.apache.log4j.spi.LoggingEvent}'s messages into bytes. {@link #fromBytes(byte[] bytes)} will creates a new
  * LoggingEvent based on the messages, which is deserialized from the bytes.
  */
 public class LoggingEventStringSerde implements Serde<LoggingEvent> {
@@ -49,7 +49,7 @@ public class LoggingEventStringSerde implements Serde<LoggingEvent> {
   }
 
   /**
-   * Convert bytes to a {@link LoggingEvent}. This LoggingEvent uses logging
+   * Convert bytes to a {@link org.apache.log4j.spi.LoggingEvent}. This LoggingEvent uses logging
    * information of the {@link LoggingEventStringSerde}, which includes log
    * name, log category and log level.
    *


[4/7] samza git commit: SAMZA-554: Simplify serde configuration by providing default serde names

Posted by cr...@apache.org.
SAMZA-554: Simplify serde configuration by providing default serde names


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9ea3a526
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9ea3a526
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9ea3a526

Branch: refs/heads/samza-sql
Commit: 9ea3a526a9de0b4bc74f476bd6d565cd192da3d0
Parents: 41c74b9
Author: Yan Fang <ya...@gmail.com>
Authored: Wed Feb 18 13:23:17 2015 -0800
Committer: Yan Fang <ya...@gmail.com>
Committed: Wed Feb 18 13:23:17 2015 -0800

----------------------------------------------------------------------
 .../apache/samza/container/SamzaContainer.scala | 27 ++++++++++++++++++--
 .../samza/container/TestSamzaContainer.scala    | 24 +++++++++++++++++
 2 files changed, 49 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9ea3a526/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index ac6e24f..2fc6c65 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -60,6 +60,7 @@ import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel}
 import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.apache.samza.config.JobConfig.Config2Job
 import java.lang.Thread.UncaughtExceptionHandler
+import org.apache.samza.serializers._
 
 object SamzaContainer extends Logging {
   def main(args: Array[String]) {
@@ -107,6 +108,28 @@ object SamzaContainer extends Logging {
       .readValue(Util.read(new URL(url)), classOf[JobModel])
   }
 
+  /**
+   * A helper function which returns system's default serde according to the
+   * serde name. If not found, throw exception.
+   */
+  def defaultSerdesFromSerdeName(serdeName: String, exceptionSystemName: String, config: Config) = {
+    info("looking for default serdes")
+    def getSerde(serdeFactory: String) = {
+      Util.getObj[SerdeFactory[Object]](serdeFactory).getSerde(serdeName, config)
+    }
+    val serde = serdeName match {
+      case "byte" => getSerde(classOf[ByteSerdeFactory].getCanonicalName)
+      case "integer" => getSerde(classOf[IntegerSerdeFactory].getCanonicalName)
+      case "json" => getSerde(classOf[JsonSerdeFactory].getCanonicalName)
+      case "long" => getSerde(classOf[LongSerdeFactory].getCanonicalName)
+      case "serializable" => getSerde(classOf[SerializableSerdeFactory[java.io.Serializable]].getCanonicalName)
+      case "string" => getSerde(classOf[StringSerdeFactory].getCanonicalName)
+      case _ => throw new SamzaException("Serde %s for system %s does not exist in configuration." format (serdeName, exceptionSystemName))
+    }
+    info("use default serde %s for %s" format (serde, serdeName))
+    serde
+  }
+
   def apply(containerModel: ContainerModel, config: Config) = {
     val containerId = containerModel.getContainerId
     val containerName = "samza-container-%s" format containerId
@@ -222,7 +245,7 @@ object SamzaContainer extends Logging {
         .filter(getSerdeName(_).isDefined)
         .map(systemName => {
           val serdeName = getSerdeName(systemName).get
-          val serde = serdes.getOrElse(serdeName, throw new SamzaException("Serde %s for system %s does not exist in configuration." format (serdeName, systemName)))
+          val serde = serdes.getOrElse(serdeName, defaultSerdesFromSerdeName(serdeName, systemName, config))
           (systemName, serde)
         }).toMap
     }
@@ -235,7 +258,7 @@ object SamzaContainer extends Logging {
         .filter(systemStream => getSerdeName(systemStream).isDefined)
         .map(systemStream => {
           val serdeName = getSerdeName(systemStream).get
-          val serde = serdes.getOrElse(serdeName, throw new SamzaException("Serde %s for system %s does not exist in configuration." format (serdeName, systemStream)))
+          val serde = serdes.getOrElse(serdeName, defaultSerdesFromSerdeName(serdeName, systemStream.toString, config))
           (systemStream, serde)
         }).toMap
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/9ea3a526/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 19ceeaa..81742bc 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -52,6 +52,8 @@ import org.junit.Assert._
 import org.junit.Test
 import org.scalatest.junit.AssertionsForJUnit
 import java.lang.Thread.UncaughtExceptionHandler
+import org.apache.samza.serializers._
+import org.apache.samza.SamzaException
 
 class TestSamzaContainer extends AssertionsForJUnit {
   @Test
@@ -168,4 +170,26 @@ class TestSamzaContainer extends AssertionsForJUnit {
     t.join
     assertTrue(caughtException)
   }
+
+  @Test
+  def testDefaultSerdesFromSerdeName {
+    import SamzaContainer._
+    val config = new MapConfig
+    assertTrue(defaultSerdesFromSerdeName("byte", "testSystemException", config).isInstanceOf[ByteSerde])
+    assertTrue(defaultSerdesFromSerdeName("integer", "testSystemException", config).isInstanceOf[IntegerSerde])
+    assertTrue(defaultSerdesFromSerdeName("json", "testSystemException", config).isInstanceOf[JsonSerde])
+    assertTrue(defaultSerdesFromSerdeName("long", "testSystemException", config).isInstanceOf[LongSerde])
+    assertTrue(defaultSerdesFromSerdeName("serializable", "testSystemException", config).isInstanceOf[SerializableSerde[java.io.Serializable @unchecked]])
+    assertTrue(defaultSerdesFromSerdeName("string", "testSystemException", config).isInstanceOf[StringSerde])
+
+    // throw SamzaException if can not find the correct serde
+    var throwSamzaException = false
+    try {
+      defaultSerdesFromSerdeName("otherName", "testSystemException", config)
+    } catch {
+      case e: SamzaException => throwSamzaException = true
+      case _: Exception =>
+    }
+    assertTrue(throwSamzaException)
+  }
 }


[2/7] samza git commit: SAMZA-507; shutdown container when threads fail with uncaught exceptions

Posted by cr...@apache.org.
SAMZA-507; shutdown container when threads fail with uncaught exceptions


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c7ac2637
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c7ac2637
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c7ac2637

Branch: refs/heads/samza-sql
Commit: c7ac26377debacbb94f9c5aac951827895c136a2
Parents: 5e32a1b
Author: Chris Riccomini <cr...@apache.org>
Authored: Fri Feb 13 13:12:14 2015 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Fri Feb 13 13:12:14 2015 -0800

----------------------------------------------------------------------
 .../apache/samza/container/SamzaContainer.scala | 10 ++++--
 .../SamzaContainerExceptionHandler.scala        | 34 ++++++++++++++++++
 .../apache/samza/container/TaskInstance.scala   |  4 ++-
 .../samza/container/TestSamzaContainer.scala    | 26 +++++++++++++-
 .../TestSamzaContainerExceptionHandler.scala    | 36 ++++++++++++++++++++
 5 files changed, 106 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c7ac2637/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 8a6d865..ac6e24f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -59,13 +59,19 @@ import java.net.URL
 import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel}
 import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.apache.samza.config.JobConfig.Config2Job
+import java.lang.Thread.UncaughtExceptionHandler
 
 object SamzaContainer extends Logging {
   def main(args: Array[String]) {
-    safeMain(() => new JmxServer)
+    safeMain(() => new JmxServer, new SamzaContainerExceptionHandler(() => System.exit(1)))
   }
 
-  def safeMain(newJmxServer: () => JmxServer) {
+  def safeMain(
+    newJmxServer: () => JmxServer,
+    exceptionHandler: UncaughtExceptionHandler = null) {
+    if (exceptionHandler != null) {
+      Thread.setDefaultUncaughtExceptionHandler(exceptionHandler)
+    }
     putMDC("containerName", "samza-container-" + System.getenv(ShellCommandConfig.ENV_CONTAINER_ID))
     // Break out the main method to make the JmxServer injectable so we can
     // validate that we don't leak JMX non-daemon threads if we have an

http://git-wip-us.apache.org/repos/asf/samza/blob/c7ac2637/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala
new file mode 100644
index 0000000..bbb094c
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerExceptionHandler.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container
+
+import java.lang.Thread.UncaughtExceptionHandler
+import org.apache.samza.util.Logging
+
+/**
+ * An UncaughtExceptionHandler that simply shuts down when any thread throws
+ * an uncaught exception.
+ */
+class SamzaContainerExceptionHandler(exit: () => Unit) extends UncaughtExceptionHandler with Logging {
+  def uncaughtException(t: Thread, e: Throwable) {
+    error("Uncaught exception in thread (name=%s). Exiting process now.".format(t.getName), e)
+    exit()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c7ac2637/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 327299b..a583ff9 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -151,7 +151,9 @@ class TaskInstance(
 
     metrics.commits.inc
 
-    storageManager.flush
+    if (storageManager != null) {
+      storageManager.flush
+    }
 
     trace("Flushing producers for taskName: %s" format taskName)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/c7ac2637/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index acded7d..19ceeaa 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -20,7 +20,6 @@
 package org.apache.samza.container
 
 import scala.collection.JavaConversions._
-
 import org.apache.samza.Partition
 import org.apache.samza.config.Config
 import org.apache.samza.config.MapConfig
@@ -52,6 +51,7 @@ import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
 import org.junit.Assert._
 import org.junit.Test
 import org.scalatest.junit.AssertionsForJUnit
+import java.lang.Thread.UncaughtExceptionHandler
 
 class TestSamzaContainer extends AssertionsForJUnit {
   @Test
@@ -144,4 +144,28 @@ class TestSamzaContainer extends AssertionsForJUnit {
     }
     assertTrue(task.wasShutdown)
   }
+
+  @Test
+  def testUncaughtExceptionHandler {
+    var caughtException = false
+    val exceptionHandler = new UncaughtExceptionHandler {
+      def uncaughtException(t: Thread, e: Throwable) {
+        caughtException = true
+      }
+    }
+    try {
+      SamzaContainer.safeMain(() => null, exceptionHandler)
+    } catch {
+      case _: Exception =>
+      // Expect some random exception from SamzaContainer because we haven't 
+      // set any environment variables for container ID, etc.
+    }
+    assertFalse(caughtException)
+    val t = new Thread(new Runnable {
+      def run = throw new RuntimeException("Uncaught exception in another thread. Catch this.")
+    })
+    t.start
+    t.join
+    assertTrue(caughtException)
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c7ac2637/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainerExceptionHandler.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainerExceptionHandler.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainerExceptionHandler.scala
new file mode 100644
index 0000000..b1d100c
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainerExceptionHandler.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container
+
+import org.junit.Test
+import org.junit.Assert._
+import org.junit.Before
+import org.apache.samza.SamzaException
+import org.junit.After
+
+class TestSamzaContainerExceptionHandler {
+  @Test
+  def testShutdownProcess {
+    var exitCalled = false
+    val exceptionHandler = new SamzaContainerExceptionHandler(() => exitCalled = true)
+    exceptionHandler.uncaughtException(Thread.currentThread, new SamzaException)
+    assertTrue(exitCalled)
+  }
+}


[5/7] samza git commit: SAMZA-479: make StreamAppender pluggable for different log formats

Posted by cr...@apache.org.
SAMZA-479: make StreamAppender pluggable for different log formats


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/11082d34
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/11082d34
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/11082d34

Branch: refs/heads/samza-sql
Commit: 11082d344d3059039fd285c0ef6752de02546977
Parents: 9ea3a52
Author: Yan Fang <ya...@gmail.com>
Authored: Wed Feb 18 18:11:37 2015 -0800
Committer: Yan Fang <ya...@gmail.com>
Committed: Wed Feb 18 18:11:37 2015 -0800

----------------------------------------------------------------------
 .../apache/samza/config/Log4jSystemConfig.java  | 27 ++++++++
 .../samza/logging/log4j/StreamAppender.java     | 45 +++++++++++-
 .../serializers/LoggingEventStringSerde.java    | 72 ++++++++++++++++++++
 .../LoggingEventStringSerdeFactory.java         | 32 +++++++++
 .../samza/config/TestLog4jSystemConfig.java     | 23 +++++++
 .../samza/logging/log4j/TestStreamAppender.java |  3 +
 .../TestLoggingEventStringSerde.java            | 44 ++++++++++++
 7 files changed, 245 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
index 659e3b6..107ddf0 100644
--- a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
+++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
@@ -22,6 +22,8 @@ package org.apache.samza.config;
 import java.util.ArrayList;
 import java.util.Map;
 
+import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory;
+
 /**
  * This class contains the methods for getting properties that are needed by the
  * StreamAppender.
@@ -74,6 +76,31 @@ public class Log4jSystemConfig {
   }
 
   /**
+   * get the class name according to the serde name. If the serde name is "log4j" and
+   * the serde class is not configured, will use the default {@link LoggingEventStringSerdeFactory}
+   * 
+   * @param name serde name
+   * @return serde factory name
+   */
+  public String getSerdeClass(String name) {
+    String className = getValue(String.format(SerializerConfig.SERDE(), name));
+    if (className == null && name.equals("log4j")) {
+      className = LoggingEventStringSerdeFactory.class.getCanonicalName();
+    }
+    return className;
+  }
+
+  public String getSystemSerdeName(String name) {
+    String systemSerdeNameConfig = String.format(SystemConfig.MSG_SERDE(), name);
+    return getValue(systemSerdeNameConfig);
+  }
+
+  public String getStreamSerdeName(String systemName, String streamName) {
+    String streamSerdeNameConfig = String.format(StreamConfig.MSG_SERDE(), systemName, streamName);
+    return getValue(streamSerdeNameConfig);
+  }
+
+  /**
    * A helper method to get the value from the config. If the config does not
    * contain the key, return null.
    *

http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index 9a9d648..4ef3551 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -30,8 +30,12 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.Log4jSystemConfig;
 import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.config.SystemConfig;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemFactory;
@@ -55,6 +59,7 @@ public class StreamAppender extends AppenderSkeleton {
   private String key = null;
   private String streamName = null;
   private boolean isApplicationMaster = false;
+  private Serde<LoggingEvent> serde = null;
   private Logger log = Logger.getLogger(StreamAppender.class);
 
   /**
@@ -96,6 +101,8 @@ public class StreamAppender extends AppenderSkeleton {
       throw new SamzaException("Please define log4j system name and factory class");
     }
 
+    setSerde(log4jSystemConfig, systemName, streamName);
+
     systemProducer = systemFactory.getProducer(systemName, config, new MetricsRegistryMap());
     systemStream = new SystemStream(systemName, streamName);
     systemProducer.register(SOURCE);
@@ -111,7 +118,7 @@ public class StreamAppender extends AppenderSkeleton {
       try {
         recursiveCall.set(true);
         OutgoingMessageEnvelope outgoingMessageEnvelope =
-            new OutgoingMessageEnvelope(systemStream, key.getBytes("UTF-8"), subAppend(event).getBytes("UTF-8"));
+            new OutgoingMessageEnvelope(systemStream, key.getBytes("UTF-8"), serde.toBytes(subLog(event)));
         systemProducer.send(SOURCE, outgoingMessageEnvelope);
       } catch (UnsupportedEncodingException e) {
         throw new SamzaException("can not send the log messages", e);
@@ -129,6 +136,12 @@ public class StreamAppender extends AppenderSkeleton {
     }
   }
 
+  private LoggingEvent subLog(LoggingEvent event) {
+    return new LoggingEvent(event.getFQNOfLoggerClass(), event.getLogger(), event.getTimeStamp(),
+        event.getLevel(), subAppend(event), event.getThreadName(), event.getThrowableInformation(),
+        event.getNDC(), event.getLocationInformation(), event.getProperties());
+  }
+
   @Override
   public void close() {
     if (!this.closed) {
@@ -182,4 +195,34 @@ public class StreamAppender extends AppenderSkeleton {
     String streamName = "__samza_" + jobName + "_" + jobId + "_logs";
     return streamName.replace("-", "_");
   }
+
+  /**
+   * set the serde for this appender. It looks for the stream serde first, then system serde.
+   * If still can not get the serde, throws exceptions. 
+   *
+   * @param log4jSystemConfig log4jSystemConfig for this appender
+   * @param systemName name of the system
+   * @param streamName name of the stream
+   */
+  private void setSerde(Log4jSystemConfig log4jSystemConfig, String systemName, String streamName) {
+    String serdeClass = null;
+    String serdeName = log4jSystemConfig.getStreamSerdeName(systemName, streamName);
+
+    if (serdeName == null) {
+      serdeName = log4jSystemConfig.getSystemSerdeName(systemName);
+    }
+
+    if (serdeName == null) {
+      throw new SamzaException("Missing serde name. Please specify the " + StreamConfig.MSG_SERDE() + " or " + SystemConfig.MSG_SERDE() + " property.");
+    }
+
+    serdeClass = log4jSystemConfig.getSerdeClass(serdeName);
+
+    if (serdeClass != null) {
+      SerdeFactory<LoggingEvent> serdeFactory = Util.<SerdeFactory<LoggingEvent>> getObj(serdeClass);
+      serde = serdeFactory.getSerde(systemName, config);
+    } else {
+      throw new SamzaException("Can not find serializers class. Please specify serializers.registry.s%.class property");
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
new file mode 100644
index 0000000..f9a0960
--- /dev/null
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j.serializers;
+
+import java.io.UnsupportedEncodingException;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.samza.SamzaException;
+import org.apache.samza.serializers.Serde;
+
+/**
+ * A serializer for LoggingEvent. It provides two methods. {@link #toBytes(LoggingEvent object)} serializes
+ * the {@link @LoggingEvent}'s messages into bytes. {@link #fromBytes(byte[] bytes)} will creates a new
+ * LoggingEvent based on the messages, which is deserialized from the bytes.
+ */
+public class LoggingEventStringSerde implements Serde<LoggingEvent> {
+  final private String ENCODING = "UTF-8";
+  final Logger logger = Logger.getLogger(LoggingEventStringSerde.class);
+
+  @Override
+  public byte[] toBytes(LoggingEvent object) {
+    byte[] bytes = null;
+    if (object != null) {
+      try {
+        bytes = object.getMessage().toString().getBytes(ENCODING);
+      } catch (UnsupportedEncodingException e) {
+        throw new SamzaException("can not be encoded to byte[]", e);
+      }
+    }
+    return bytes;
+  }
+
+  /**
+   * Convert bytes to a {@link LoggingEvent}. This LoggingEvent uses logging
+   * information of the {@link LoggingEventStringSerde}, which includes log
+   * name, log category and log level.
+   *
+   * @param bytes bytes for decoding
+   * @return LoggingEvent a new LoggingEvent
+   */
+  @Override
+  public LoggingEvent fromBytes(byte[] bytes) {
+    if (bytes == null) {
+      return null;
+    }
+    String log;
+    try {
+      log = new String(bytes, ENCODING);
+    } catch (UnsupportedEncodingException e) {
+      throw new SamzaException("can not decode to String", e);
+    }
+    return new LoggingEvent(logger.getName(), logger, logger.getLevel(), log, null);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerdeFactory.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerdeFactory.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerdeFactory.java
new file mode 100644
index 0000000..150c3e5
--- /dev/null
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerdeFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j.serializers;
+
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+
+public class LoggingEventStringSerdeFactory implements SerdeFactory<LoggingEvent> {
+  @Override
+  public Serde<LoggingEvent> getSerde(String name, Config config) {
+    return new LoggingEventStringSerde();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java b/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
index 64a1e70..16ccb45 100644
--- a/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
+++ b/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.*;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -64,4 +65,26 @@ public class TestLog4jSystemConfig {
     exception.expect(ConfigException.class);
     log4jSystemConfig.getSystemName();
   }
+
+  @Test
+  public void testGetSerdeClass() {
+    Map<String, String> map = new HashMap<String, String>();
+    Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(new MapConfig(map));
+
+    // get the default serde
+    assertEquals(LoggingEventStringSerdeFactory.class.getCanonicalName(), log4jSystemConfig.getSerdeClass("log4j"));
+    // get null
+    assertNull(log4jSystemConfig.getSerdeClass("otherName"));
+  }
+
+  @Test
+  public void testGetSerdeName() {
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("systems.mockSystem.streams.mockStream.samza.msg.serde", "streamSerde");
+    map.put("systems.mockSystem.samza.msg.serde", "systemSerde");
+    Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(new MapConfig(map));
+
+    assertEquals("streamSerde", log4jSystemConfig.getStreamSerdeName("mockSystem", "mockStream"));
+    assertEquals("systemSerde", log4jSystemConfig.getSystemSerdeName("mockSystem"));
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
index 46e4b8c..3e4ddc9 100644
--- a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
+++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
@@ -23,10 +23,12 @@ import static org.junit.Assert.*;
 
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory;
 import org.junit.Test;
 
 public class TestStreamAppender {
@@ -64,6 +66,7 @@ public class TestStreamAppender {
       Map<String, String> map = new HashMap<String, String>();
       map.put("job.name", "log4jTest");
       map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
+      map.put("systems.mock.samza.msg.serde", "log4j");
       return new MapConfig(map);
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/test/java/org/apache/samza/logging/log4j/serializers/TestLoggingEventStringSerde.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/serializers/TestLoggingEventStringSerde.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/serializers/TestLoggingEventStringSerde.java
new file mode 100644
index 0000000..4a7aa68
--- /dev/null
+++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/serializers/TestLoggingEventStringSerde.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j.serializers;
+
+import static org.junit.Assert.*;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.Test;
+
+public class TestLoggingEventStringSerde {
+
+  @Test
+  public void test() {
+    String testLog = "testing";
+    Logger logger = Logger.getLogger(TestLoggingEventStringSerde.class);
+    LoggingEvent log = new LoggingEvent(logger.getName(), logger, logger.getLevel(), testLog, null);
+    LoggingEventStringSerde loggingEventStringSerde = new LoggingEventStringSerde();
+
+    assertNull(loggingEventStringSerde.fromBytes(null));
+    assertNull(loggingEventStringSerde.toBytes(null));
+
+    assertArrayEquals(testLog.getBytes(), loggingEventStringSerde.toBytes(log));
+    // only the log messages are guaranteed to be equivalent 
+    assertEquals(log.getMessage().toString(), loggingEventStringSerde.fromBytes(testLog.getBytes()).getMessage().toString());
+  }
+}


[7/7] samza git commit: Merge branch 'master' into samza-sql

Posted by cr...@apache.org.
Merge branch 'master' into samza-sql


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c8da3e3b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c8da3e3b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c8da3e3b

Branch: refs/heads/samza-sql
Commit: c8da3e3b5269a69dd3760f163a091a3b8c918f9f
Parents: 78d2fed 77359fe
Author: Chris Riccomini <cr...@apache.org>
Authored: Tue Feb 24 09:39:09 2015 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Tue Feb 24 09:39:09 2015 -0800

----------------------------------------------------------------------
 .../apache/samza/container/SamzaContainer.scala | 37 ++++++++-
 .../SamzaContainerExceptionHandler.scala        | 34 ++++++++
 .../apache/samza/container/TaskInstance.scala   |  4 +-
 .../samza/serializers/SerializableSerde.scala   | 67 ++++++++++++++++
 .../samza/container/TestSamzaContainer.scala    | 50 +++++++++++-
 .../TestSamzaContainerExceptionHandler.scala    | 36 +++++++++
 .../serializers/TestSerializableSerde.scala     | 45 +++++++++++
 .../apache/samza/config/Log4jSystemConfig.java  | 27 +++++++
 .../samza/logging/log4j/StreamAppender.java     | 45 ++++++++++-
 .../serializers/LoggingEventStringSerde.java    | 72 +++++++++++++++++
 .../LoggingEventStringSerdeFactory.java         | 32 ++++++++
 .../samza/config/TestLog4jSystemConfig.java     | 23 ++++++
 .../samza/logging/log4j/TestStreamAppender.java |  3 +
 .../TestLoggingEventStringSerde.java            | 44 ++++++++++
 samza-shell/src/main/bash/stat-yarn-job.sh      | 21 +++++
 .../src/main/config/negate-number.properties    | 18 +----
 .../kafka-read-write-performance.properties     | 35 ++++++++
 .../test/integration/NegateNumberTask.java      | 44 +++++++++-
 .../src/main/python/configs/downloads.json      |  2 +-
 samza-test/src/main/python/configs/kafka.json   | 22 ++---
 .../python/configs/smoke-tests/smoke-tests.json |  6 --
 samza-test/src/main/python/configs/tests.json   |  5 ++
 samza-test/src/main/python/deployment.py        | 21 ++---
 .../src/main/python/samza_job_yarn_deployer.py  | 47 ++++++++++-
 samza-test/src/main/python/tests.py             |  3 +-
 .../src/main/python/tests/performance_tests.py  | 80 +++++++++++++++++++
 samza-test/src/main/python/tests/smoke_tests.py | 83 +++++++------------
 samza-test/src/main/python/tests/util.py        | 84 ++++++++++++++++++++
 28 files changed, 871 insertions(+), 119 deletions(-)
----------------------------------------------------------------------