You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/07/31 23:48:46 UTC

[1/2] git commit: SAMZA-109; make task.opts easier to use.

Repository: incubator-samza
Updated Branches:
  refs/heads/master cc11e02dd -> 38d659b33


SAMZA-109; make task.opts easier to use.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/97476e32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/97476e32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/97476e32

Branch: refs/heads/master
Commit: 97476e322f048c578973e8249359d04a5d533678
Parents: cc11e02
Author: Chinmay Soman <ch...@gmail.com>
Authored: Thu Jul 31 14:20:31 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Thu Jul 31 14:20:31 2014 -0700

----------------------------------------------------------------------
 .../0.7.0/jobs/configuration-table.html         | 24 +++---
 .../documentation/0.7.0/jobs/configuration.md   |  4 +-
 .../documentation/0.7.0/jobs/job-runner.md      |  2 +-
 .../learn/documentation/0.7.0/jobs/packaging.md |  2 +-
 .../samza/job/local/LocalJobFactory.scala       | 87 --------------------
 .../samza/job/local/ProcessJobFactory.scala     | 83 +++++++++++++++++++
 .../samza/job/local/ThreadJobFactory.scala      | 65 +++++++++++++++
 samza-shell/src/main/bash/run-am.sh             |  4 -
 samza-shell/src/main/bash/run-class.sh          | 42 ++++++++--
 samza-shell/src/main/bash/run-container.sh      |  4 -
 .../main/resources/hello-stateful-world.samsa   |  2 +-
 .../test/integration/TestStatefulTask.scala     | 10 +--
 .../TestSamzaContainerPerformance.scala         |  7 +-
 13 files changed, 208 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/97476e32/docs/learn/documentation/0.7.0/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/jobs/configuration-table.html b/docs/learn/documentation/0.7.0/jobs/configuration-table.html
index 41f334f..ea6b1ef 100644
--- a/docs/learn/documentation/0.7.0/jobs/configuration-table.html
+++ b/docs/learn/documentation/0.7.0/jobs/configuration-table.html
@@ -124,10 +124,14 @@
                         <a href="../api/javadocs/org/apache/samza/job/StreamJobFactory.html">StreamJobFactory</a>.
                         Samza ships with two implementations:
                         <dl>
-                            <dt><code>org.apache.samza.job.local.LocalJobFactory</code></dt>
-                            <dd>Runs your job on your local machine, either using threads or as a subprocess (see the
-                                <a href="#task-command-class" class="property">task.command.class</a> property for
-                                details). This is intended only for development, not for production deployments.</dd>
+                            <dt><code>org.apache.samza.job.local.ThreadJobFactory</code></dt>
+                            <dd>Runs your job on your local machine using threads. This is intended only for
+                                development, not for production deployments.</dd>
+                            <dt><code>org.apache.samza.job.local.ProcessJobFactory</code></dt>
+                            <dd>Runs your job on your local machine as a subprocess. An optional command builder
+                                property can also be specified (see <a href="#task-command-class" class="property">
+                                    task.command.class</a> for details). This is intended only for development,
+                                not for production deployments.</dd>
                             <dt><code>org.apache.samza.job.yarn.YarnJobFactory</code></dt>
                             <dd>Runs your job on a YARN grid. See <a href="#yarn">below</a> for YARN-specific configuration.</dd>
                         </dl>
@@ -282,13 +286,7 @@
                         The fully-qualified name of the Java class which determines the command line and environment
                         variables for a <a href="../container/samza-container.html">container</a>. It must be a subclass of
                         <a href="../api/javadocs/org/apache/samza/job/CommandBuilder.html">CommandBuilder</a>.
-                        Normally there is no need to change the default, with one exception: if you are using
-                        <code>LocalJobFactory</code> for <a href="#job-factory-class" class="property">job.factory.class</a>,
-                        and <code>task.command.class</code> is not set, the job is run using threads in the current JVM
-                        by default. Thus, any <a href="#task-opts" class="property">task.opts</a> you have configured
-                        will have no effect. If you want <code>LocalJobFactory</code> to use a subprocess rather than
-                        a thread-based executor, you must specify
-                        <code>task.command.class=org.apache.samza.job.ShellCommandBuilder</code> explicitly.
+                        This defaults to <code>task.command.class=org.apache.samza.job.ShellCommandBuilder</code>.
                     </td>
                 </tr>
 
@@ -303,9 +301,7 @@
                         <ul>
                             <li>If you set this property, the log configuration is disrupted. Please see
                             <a href="https://issues.apache.org/jira/browse/SAMZA-109">SAMZA-109</a> for a workaround.</li>
-                            <li>When running with <code>LocalJobFactory</code>, you need to also set
-                            <a href="#task-command-class" class="property">task.command.class</a>, otherwise the
-                            <code>task.opts</code> setting won't take effect.</li>
+                            <li>This cannot be used when running with <code>ThreadJobFactory</code></li>
                         </ul>
                     </td>
                 </tr>

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/97476e32/docs/learn/documentation/0.7.0/jobs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/jobs/configuration.md b/docs/learn/documentation/0.7.0/jobs/configuration.md
index 2ed3ea7..8bcc433 100644
--- a/docs/learn/documentation/0.7.0/jobs/configuration.md
+++ b/docs/learn/documentation/0.7.0/jobs/configuration.md
@@ -23,7 +23,7 @@ All Samza jobs have a configuration file that defines the job. A very basic conf
 
 {% highlight jproperties %}
 # Job
-job.factory.class=samza.job.local.LocalJobFactory
+job.factory.class=samza.job.local.ThreadJobFactory
 job.name=hello-world
 
 # Task
@@ -42,7 +42,7 @@ systems.example-system.samza.msg.serde=json
 
 There are four major sections to a configuration file:
 
-1. The job section defines things like the name of the job, and whether to use the YarnJobFactory or LocalJobFactory.
+1. The job section defines things like the name of the job, and whether to use the YarnJobFactory or ProcessJobFactory/ThreadJobFactory.
 2. The task section is where you specify the class name for your [StreamTask](../api/overview.html). It's also where you define what the [input streams](../container/streams.html) are for your task.
 3. The serializers section defines the classes of the [serdes](../container/serialization.html) used for serialization and deserialization of specific objects that are received and sent along different streams.
 4. The system section defines systems that your StreamTask can read from along with the types of serdes used for sending keys and messages from that system. Usually, you'll define a Kafka system, if you're reading from Kafka, although you can also specify your own self-implemented Samza-compatible systems. See the [hello-samza example project](/startup/hello-samza/0.7.0)'s Wikipedia system for a good example of a self-implemented system.

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/97476e32/docs/learn/documentation/0.7.0/jobs/job-runner.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/jobs/job-runner.md b/docs/learn/documentation/0.7.0/jobs/job-runner.md
index 55c9114..c2f6b09 100644
--- a/docs/learn/documentation/0.7.0/jobs/job-runner.md
+++ b/docs/learn/documentation/0.7.0/jobs/job-runner.md
@@ -37,7 +37,7 @@ public interface ConfigFactory {
 
 The Config object is just a wrapper around Map<String, String>, with some nice helper methods. Out of the box, Samza ships with the PropertiesConfigFactory, but developers can implement any kind of ConfigFactory they wish.
 
-Once the JobRunner gets your configuration, it gives your configuration to the StreamJobFactory class defined by the "job.factory" property. Samza ships with two job factory implementations: LocalJobFactory and YarnJobFactory. The StreamJobFactory's responsibility is to give the JobRunner a job that it can run.
+Once the JobRunner gets your configuration, it gives your configuration to the StreamJobFactory class defined by the "job.factory" property. Samza ships with three job factory implementations: ThreadJobFactory, ProcessJobFactory and YarnJobFactory. The StreamJobFactory's responsibility is to give the JobRunner a job that it can run.
 
 {% highlight java %}
 public interface StreamJob {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/97476e32/docs/learn/documentation/0.7.0/jobs/packaging.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/jobs/packaging.md b/docs/learn/documentation/0.7.0/jobs/packaging.md
index f888f75..9e55f9a 100644
--- a/docs/learn/documentation/0.7.0/jobs/packaging.md
+++ b/docs/learn/documentation/0.7.0/jobs/packaging.md
@@ -19,7 +19,7 @@ title: Packaging
    limitations under the License.
 -->
 
-The [JobRunner](job-runner.html) page talks about run-job.sh, and how it's used to start a job either locally (LocalJobFactory) or with YARN (YarnJobFactory). In the diagram that shows the execution flow, it also shows a run-container.sh script. This script, along with a run-am.sh script, are what Samza actually calls to execute its code.
+The [JobRunner](job-runner.html) page talks about run-job.sh, and how it's used to start a job either locally (ProcessJobFactory/ThreadJobFactory) or with YARN (YarnJobFactory). In the diagram that shows the execution flow, it also shows a run-container.sh script. This script, along with a run-am.sh script, are what Samza actually calls to execute its code.
 
 ```
 bin/run-am.sh

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/97476e32/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
deleted file mode 100644
index 713bded..0000000
--- a/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.local
-import org.apache.samza.config.TaskConfig._
-import org.apache.samza.config.Config
-import org.apache.samza.config.ShellCommandConfig._
-import org.apache.samza.job.CommandBuilder
-import org.apache.samza.job.StreamJob
-import org.apache.samza.job.StreamJobFactory
-import grizzled.slf4j.Logging
-import org.apache.samza.SamzaException
-import org.apache.samza.container.{TaskNamesToSystemStreamPartitions, SamzaContainer}
-import org.apache.samza.util.Util
-import org.apache.samza.job.ShellCommandBuilder
-import scala.collection.JavaConversions._
-
-class LocalJobFactory extends StreamJobFactory with Logging {
-  def getJob(config: Config): StreamJob = {
-    val jobName = "local-container"
-
-    // Since we're local, there will only be a single task into which all the SSPs will be processed
-    val taskToTaskNames: Map[Int, TaskNamesToSystemStreamPartitions] = Util.assignContainerToSSPTaskNames(config, 1)
-    if(taskToTaskNames.size != 1) {
-      throw new SamzaException("Should only have a single task count but somehow got more " + taskToTaskNames.size)
-    }
-
-    // So pull out that single TaskNamesToSystemStreamPartitions
-    val sspTaskName: TaskNamesToSystemStreamPartitions = taskToTaskNames.getOrElse(0, throw new SamzaException("Should have a 0 task number for the SSPs but somehow do not: " + taskToTaskNames))
-    if (sspTaskName.size <= 0) {
-      throw new SamzaException("No SystemStreamPartitions to process were detected for your input streams. It's likely that the system(s) specified don't know about the input streams: %s" format config.getInputStreams)
-    }
-
-    val taskNameToChangeLogPartitionMapping = Util.getTaskNameToChangeLogPartitionMapping(config, taskToTaskNames).map(kv => kv._1 -> Integer.valueOf(kv._2))
-    info("got taskName for job %s" format sspTaskName)
-
-    config.getCommandClass match {
-      case Some(cmdBuilderClassName) => {
-        // A command class was specified, so we need to use a process job to
-        // execute the command in its own process.
-        val cmdBuilder = Class.forName(cmdBuilderClassName).newInstance.asInstanceOf[CommandBuilder]
-        
-        cmdBuilder
-          .setConfig(config)
-          .setName(jobName)
-          .setTaskNameToSystemStreamPartitionsMapping(sspTaskName.getJavaFriendlyType)
-
-        val processBuilder = new ProcessBuilder(cmdBuilder.buildCommand.split(" ").toList)
-
-        processBuilder
-          .environment
-          .putAll(cmdBuilder.buildEnvironment)
-
-        new ProcessJob(processBuilder)
-      }
-      case _ => {
-        info("No config specified for %s. Defaulting to ThreadJob, which is only meant for debugging." format COMMAND_BUILDER)
-
-        // Give developers a nice friendly warning if they've specified task.opts and are using a threaded job.
-        config.getTaskOpts match {
-          case Some(taskOpts) => warn("%s was specified in config, but is not being used because job is being executed with ThreadJob. You probably want to run %s=%s." format (TASK_JVM_OPTS, COMMAND_BUILDER, classOf[ShellCommandBuilder].getName))
-          case _ => None
-        }
-
-        // No command class was specified, so execute the job in this process
-        // using a threaded job.
-        new ThreadJob(SamzaContainer(jobName, sspTaskName, taskNameToChangeLogPartitionMapping, config))
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/97476e32/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
new file mode 100644
index 0000000..74b9367
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.local
+
+import org.apache.samza.container.{TaskNamesToSystemStreamPartitions, SamzaContainer}
+import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
+import org.apache.samza.config.Config
+import org.apache.samza.config.TaskConfig._
+import org.apache.samza.job.{CommandBuilder, ShellCommandBuilder, StreamJob, StreamJobFactory}
+import org.apache.samza.util.Util
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import scala.collection.JavaConversions._
+
+/**
+ * Creates a stand alone ProcessJob with the specified config
+ */
+class ProcessJobFactory extends StreamJobFactory with Logging {
+  def getJob(config: Config): StreamJob = {
+    val jobName = "local-process-container"
+
+    // Since we're local, there will only be a single task into which all the SSPs will be processed
+    val taskToTaskNames: Map[Int, TaskNamesToSystemStreamPartitions] = Util.assignContainerToSSPTaskNames(config, 1)
+    if(taskToTaskNames.size != 1) {
+      throw new SamzaException("Should only have a single task count but somehow got more " + taskToTaskNames.size)
+    }
+
+    // So pull out that single TaskNamesToSystemStreamPartitions
+    val sspTaskName: TaskNamesToSystemStreamPartitions = taskToTaskNames.getOrElse(0, throw new SamzaException("Should have a 0 task number for the SSPs but somehow do not: " + taskToTaskNames))
+    if (sspTaskName.size <= 0) {
+      throw new SamzaException("No SystemStreamPartitions to process were detected for your input streams. It's likely that the system(s) specified don't know about the input streams: %s" format config.getInputStreams)
+    }
+
+    val taskNameToChangeLogPartitionMapping = Util.getTaskNameToChangeLogPartitionMapping(config, taskToTaskNames).map(kv => kv._1 -> Integer.valueOf(kv._2))
+    info("got taskName for job %s" format sspTaskName)
+
+    val commandBuilder : CommandBuilder = {
+      config.getCommandClass match {
+        case Some(cmdBuilderClassName) => {
+          // A command class was specified, so we need to use a process job to
+          // execute the command in its own process.
+          Class.forName(cmdBuilderClassName).newInstance.asInstanceOf[CommandBuilder]
+        }
+        case _ => {
+          info("Defaulting to ShellCommandBuilder")
+          new ShellCommandBuilder
+        }
+      }
+    }
+
+    commandBuilder
+      .setConfig(config)
+      .setName(jobName)
+      .setTaskNameToSystemStreamPartitionsMapping(sspTaskName.getJavaFriendlyType)
+      .setTaskNameToChangeLogPartitionMapping(taskNameToChangeLogPartitionMapping.map(kv => kv._1 -> Integer.valueOf(kv._2)).asJava)
+
+    val processBuilder = new ProcessBuilder(commandBuilder.buildCommand.split(" ").toList)
+
+    processBuilder
+      .environment
+      .putAll(commandBuilder.buildEnvironment)
+
+    new ProcessJob(processBuilder)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/97476e32/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
new file mode 100644
index 0000000..c0ea0af
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.local
+
+import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
+import org.apache.samza.config.Config
+import org.apache.samza.config.ShellCommandConfig._
+import org.apache.samza.config.TaskConfig._
+import org.apache.samza.container.{TaskNamesToSystemStreamPartitions, SamzaContainer}
+import org.apache.samza.job.{StreamJob, StreamJobFactory}
+import org.apache.samza.util.Util
+import org.apache.samza.config.JobConfig._
+
+/**
+ * Creates a new Thread job with the given config
+ */
+class ThreadJobFactory extends StreamJobFactory with Logging {
+  def getJob(config: Config): StreamJob = {
+    val jobName = "local-thread-container"
+
+    // Since we're local, there will only be a single task into which all the SSPs will be processed
+    val taskToTaskNames: Map[Int, TaskNamesToSystemStreamPartitions] = Util.assignContainerToSSPTaskNames(config, 1)
+    if(taskToTaskNames.size != 1) {
+      throw new SamzaException("Should only have a single task count but somehow got more " + taskToTaskNames.size)
+    }
+
+    // So pull out that single TaskNamesToSystemStreamPartitions
+    val sspTaskName: TaskNamesToSystemStreamPartitions = taskToTaskNames.getOrElse(0, throw new SamzaException("Should have a 0 task number for the SSPs but somehow do not: " + taskToTaskNames))
+    if (sspTaskName.size <= 0) {
+      throw new SamzaException("No SystemStreamPartitions to process were detected for your input streams. It's likely that the system(s) specified don't know about the input streams: %s" format config.getInputStreams)
+    }
+
+    val taskNameToChangeLogPartitionMapping = Util.getTaskNameToChangeLogPartitionMapping(config, taskToTaskNames).map(kv => kv._1 -> Integer.valueOf(kv._2))
+    info("got taskName for job %s" format sspTaskName)
+    info("Creating a ThreadJob, which is only meant for debugging.")
+
+    // Give developers a nice friendly warning if they've specified task.opts and are using a threaded job.
+    config.getTaskOpts match {
+      case Some(taskOpts) => warn("%s was specified in config, but is not being used because job is being executed with ThreadJob. You probably want to run %s=%s." format (TASK_JVM_OPTS, STREAM_JOB_FACTORY_CLASS, classOf[ThreadJobFactory].getName))
+      case _ => None
+    }
+
+    // No command class was specified, so execute the job in this process
+    // using a threaded job.
+    new ThreadJob(SamzaContainer(jobName, sspTaskName, taskNameToChangeLogPartitionMapping, config))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/97476e32/samza-shell/src/main/bash/run-am.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/run-am.sh b/samza-shell/src/main/bash/run-am.sh
index 878d938..c202596 100755
--- a/samza-shell/src/main/bash/run-am.sh
+++ b/samza-shell/src/main/bash/run-am.sh
@@ -16,8 +16,4 @@
 # specific language governing permissions and limitations
 # under the License.
 
-if [ -z "$SAMZA_LOG4J_CONFIG" ]; then
-  export SAMZA_LOG4J_CONFIG=file:$(dirname $0)/../lib/log4j.xml
-fi
-
 exec $(dirname $0)/run-class.sh org.apache.samza.job.yarn.SamzaAppMaster $@

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/97476e32/samza-shell/src/main/bash/run-class.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/run-class.sh b/samza-shell/src/main/bash/run-class.sh
index a75cbb9..8b04ed8 100755
--- a/samza-shell/src/main/bash/run-class.sh
+++ b/samza-shell/src/main/bash/run-class.sh
@@ -36,6 +36,7 @@ fi
 HADOOP_YARN_HOME="${HADOOP_YARN_HOME:-$HOME/.samza}"
 HADOOP_CONF_DIR="${HADOOP_CONF_DIR:-$HADOOP_YARN_HOME/conf}"
 CLASSPATH=$HADOOP_CONF_DIR
+GC_LOG_ROTATION_OPTS="-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10241024"
 
 for file in $base_dir/lib/*.[jw]ar;
 do
@@ -60,17 +61,46 @@ fi
 mkdir -p $base_dir/tmp
 JAVA_TEMP_DIR=$base_dir/tmp
 
-if [ -z "$JAVA_OPTS" ]; then
-  COMMON_OPTS="-Xmx768M -XX:+PrintGCDateStamps -Xloggc:$SAMZA_LOG_DIR/gc.log -Dsamza.log.dir=$SAMZA_LOG_DIR -Dsamza.container.name=$SAMZA_CONTAINER_NAME -Djava.io.tmpdir=$JAVA_TEMP_DIR"
-  GC_LOG_ROTATION_OPTS="-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10241024"
-  JAVA_OPTS="$COMMON_OPTS"
-  # Check whether the JVM supports GC Log rotation, and enable it if so.
+# Check whether the JVM supports GC Log rotation, and enable it if so.
+function enable_gc_log_rotation {
   `java -Xloggc:/dev/null $GC_LOG_ROTATION_OPTS -version 2> /dev/null`
   if [ $? -eq 0 ] ; then
     JAVA_OPTS="$JAVA_OPTS $GC_LOG_ROTATION_OPTS"
   fi
-  JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=$SAMZA_LOG4J_CONFIG"
+}
+
+# Try and use 64-bit mode if available in JVM_OPTS
+function check_and_enable_64_bit_mode {
+  `java -d64 -version`
+  if [ $? -eq 0 ] ; then
+    JAVA_OPTS="$JAVA_OPTS -d64"
+  fi
+}
+
+# Initialize SAMZA_LOG4J_CONFIG if currently undefined
+if [ -z "$SAMZA_LOG4J_CONFIG" ]; then
+  export SAMZA_LOG4J_CONFIG=file:$base_dir/lib/log4j.xml
 fi
 
+### Initialize JVM OPTS ###
+
+# If JAVA_OPTS is not specified in job properties (task.opts), initialize to default
+if [ -z "$JAVA_OPTS" ]; then
+  # Enable GC related flags
+  JAVA_OPTS="-Xmx768M -XX:+PrintGCDateStamps -Xloggc:$SAMZA_LOG_DIR/gc.log"
+
+  # Enable GC log rotation by default
+  enable_gc_log_rotation
+else
+  # Otherwise, check if the GC related flags are specified. If not - add the respective flags to JVM_OPTS.
+  [[ $JAVA_OPTS != *PrintGCDateStamps* && $JAVA_OPTS != *loggc* ]] && JAVA_OPTS="$JAVA_OPTS -XX:+PrintGCDateStamps -Xloggc:$SAMZA_LOG_DIR/gc.log"
+
+  # Also check if GC log rotation is already enabled. If not - add the respective flags to JVM_OPTS
+  [[ $JAVA_OPTS != *UseGCLogFileRotation* ]] && enable_gc_log_rotation
+fi
+
+check_and_enable_64_bit_mode
+JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=$SAMZA_LOG4J_CONFIG -Dsamza.log.dir=$SAMZA_LOG_DIR -Dsamza.container.name=$SAMZA_CONTAINER_NAME -Djava.io.tmpdir=$JAVA_TEMP_DIR"
+
 echo $JAVA $JAVA_OPTS -cp $CLASSPATH $@
 exec $JAVA $JAVA_OPTS -cp $CLASSPATH $@

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/97476e32/samza-shell/src/main/bash/run-container.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/run-container.sh b/samza-shell/src/main/bash/run-container.sh
index b43bc3f..72cee18 100755
--- a/samza-shell/src/main/bash/run-container.sh
+++ b/samza-shell/src/main/bash/run-container.sh
@@ -16,8 +16,4 @@
 # specific language governing permissions and limitations
 # under the License.
 
-if [ -z "$SAMZA_LOG4J_CONFIG" ]; then
-  export SAMZA_LOG4J_CONFIG=file:$(dirname $0)/../lib/log4j.xml
-fi
-
 exec $(dirname $0)/run-class.sh org.apache.samza.container.SamzaContainer $@

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/97476e32/samza-test/src/main/resources/hello-stateful-world.samsa
----------------------------------------------------------------------
diff --git a/samza-test/src/main/resources/hello-stateful-world.samsa b/samza-test/src/main/resources/hello-stateful-world.samsa
index 8c2f2e4..745f881 100644
--- a/samza-test/src/main/resources/hello-stateful-world.samsa
+++ b/samza-test/src/main/resources/hello-stateful-world.samsa
@@ -17,7 +17,7 @@
 # under the License.
 
 # Job
-job.factory.class=samza.job.local.LocalJobFactory
+job.factory.class=samza.job.local.ThreadJobFactory
 job.name=hello-stateful-world
 
 # Task

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/97476e32/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 3ed8b5c..44ab623 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -20,7 +20,6 @@
 package org.apache.samza.test.integration
 
 import java.util.Properties
-import java.util.concurrent.CountDownLatch
 import java.util.concurrent.TimeUnit
 import kafka.admin.AdminUtils
 import kafka.common.ErrorMapping
@@ -41,11 +40,12 @@ import org.I0Itec.zkclient.ZkClient
 import org.apache.samza.Partition
 import org.apache.samza.checkpoint.Checkpoint
 import org.apache.samza.config.Config
+import org.apache.samza.job.local.ThreadJobFactory
+import java.util.concurrent.CountDownLatch
 import org.apache.samza.config.MapConfig
 import org.apache.samza.container.TaskName
 import org.apache.samza.job.ApplicationStatus
 import org.apache.samza.job.StreamJob
-import org.apache.samza.job.local.LocalJobFactory
 import org.apache.samza.storage.kv.KeyValueStore
 import org.apache.samza.system.kafka.TopicMetadataCache
 import org.apache.samza.system.{SystemStreamPartition, IncomingMessageEnvelope}
@@ -181,7 +181,7 @@ object TestStatefulTask {
  * 1. Starts ZK, and 3 kafka brokers.
  * 2. Create two topics: input and mystore.
  * 3. Validate that the topics were created successfully and have leaders.
- * 4. Start a single partition of TestTask using LocalJobFactory/ThreadJob.
+ * 4. Start a single partition of TestTask using ThreadJobFactory.
  * 5. Send four messages to input (1,2,3,2), which contain one dupe (2).
  * 6. Validate that all messages were received by TestTask.
  * 7. Validate that TestTask called store.put() for all four messages, and that the messages ended up in the mystore topic.
@@ -193,9 +193,10 @@ object TestStatefulTask {
  */
 class TestStatefulTask {
   import TestStatefulTask._
+  val jobFactory = new ThreadJobFactory
 
   val jobConfig = Map(
-    "job.factory.class" -> "org.apache.samza.job.local.LocalJobFactory",
+    "job.factory.class" -> jobFactory.getClass.getCanonicalName,
     "job.name" -> "hello-stateful-world",
     "task.class" -> "org.apache.samza.test.integration.TestTask",
     "task.inputs" -> "kafka.input",
@@ -317,7 +318,6 @@ class TestStatefulTask {
    * time, number of partitions, etc.
    */
   def startJob = {
-    val jobFactory = new LocalJobFactory
     val job = jobFactory.getJob(new MapConfig(jobConfig))
 
     // Start task.

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/97476e32/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala b/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
index 1f4c247..688e74e 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
@@ -19,12 +19,12 @@
 
 package org.apache.samza.test.performance
 
+import org.apache.samza.job.local.ThreadJobFactory
 import org.junit.Test
 import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskCoordinator
 import org.apache.samza.task.MessageCollector
 import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.job.local.LocalJobFactory
 import org.apache.samza.config.MapConfig
 import scala.collection.JavaConversions._
 import org.apache.samza.job.ShellCommandBuilder
@@ -77,8 +77,10 @@ class TestSamzaContainerPerformance extends Logging{
   var logInterval = System.getProperty("samza.task.log.interval", "10000").toInt
   var maxMessages = System.getProperty("samza.task.max.messages", "10000000").toInt
 
+  val jobFactory = new ThreadJobFactory
+
   val jobConfig = Map(
-    "job.factory.class" -> "org.apache.samza.job.local.LocalJobFactory",
+    "job.factory.class" -> jobFactory.getClass.getCanonicalName,
     "job.name" -> "test-container-performance",
     "task.class" -> classOf[TestPerformanceTask].getName,
     "task.inputs" -> (0 until streamCount).map(i => "mock.stream" + i).mkString(","),
@@ -94,7 +96,6 @@ class TestSamzaContainerPerformance extends Logging{
   def testContainerPerformance {
     info("Testing performance with configuration: %s" format jobConfig)
 
-    val jobFactory = new LocalJobFactory
     val job = jobFactory
       .getJob(new MapConfig(jobConfig))
       .submit


[2/2] git commit: SAMZA-360; check-all.sh sets java home for gradle correctly now

Posted by cr...@apache.org.
SAMZA-360; check-all.sh sets java home for gradle correctly now


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/38d659b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/38d659b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/38d659b3

Branch: refs/heads/master
Commit: 38d659b33565900e96822ac85e4e0a644ac38a66
Parents: 97476e3
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Thu Jul 31 14:48:34 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Thu Jul 31 14:48:34 2014 -0700

----------------------------------------------------------------------
 bin/check-all.sh | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/38d659b3/bin/check-all.sh
----------------------------------------------------------------------
diff --git a/bin/check-all.sh b/bin/check-all.sh
index cc31942..8a87c02 100755
--- a/bin/check-all.sh
+++ b/bin/check-all.sh
@@ -61,9 +61,11 @@ do
   for scala_version in "${SCALAs[@]}"
   do
     jdk_number=${i:4:1}
-    echo "------------- Running check task against JDK${jdk_number}/Scala ${scala_version}"
-    JAVA_HOME=${!i}
-    $base_dir/gradlew -PscalaVersion=${scala_version} clean check
-    echo "------------- Finished running check task against JDK${jdk_number}/Scala ${scala_version}"
+    # skip because Scala 2.9.2 doesn't work with JDK8
+    if [[ "$jdk_number" != "8" ]] || [[ "$scala_version" != "2.9.2" ]]; then
+      echo "------------- Running check task against JDK${jdk_number}/Scala ${scala_version}"
+      $base_dir/gradlew -PscalaVersion=${scala_version} -Dorg.gradle.java.home=${!i} clean check $@
+      echo "------------- Finished running check task against JDK${jdk_number}/Scala ${scala_version}"
+    fi
   done
 done