You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/03/07 00:18:50 UTC

[1/9] samza git commit: SAMZA-1092: replace stream spec in fluent API

Repository: samza
Updated Branches:
  refs/heads/samza-fluent-api-v1 a83c69a25 -> 72fc185a0


SAMZA-1092: replace stream spec in fluent API

Replaced the StreamSpec class w/ the new one from SAMZA-1075.

Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>

Reviewers: Jacob Maes <jm...@linkedin.com>

Closes #58 from nickpan47/replace-stream-spec and squashes the following commits:

761ebb5 [Yi Pan (Data Infrastructure)] SAMZA-1092: replace StreamSpec w/ new class in system package
df953c2 [Yi Pan (Data Infrastructure)] SAMZA-1092: fix unit test
71331d8 [Yi Pan (Data Infrastructure)] SAMZA-1092: replace StreamSpec w/ new class
2fb19e9 [Yi Pan (Data Infrastructure)] SAMZA-1092: replace StreamSpec w/ new class in fluent API
ed3ad8e [Yi Pan (Data Infrastructure)] WIP: replace stream spec in fluent API


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e6147fda
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e6147fda
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e6147fda

Branch: refs/heads/samza-fluent-api-v1
Commit: e6147fdac31851e2fe91ab0740d7f2780fc87067
Parents: b5ea877
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Thu Feb 23 12:48:56 2017 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Thu Feb 23 12:48:56 2017 -0800

----------------------------------------------------------------------
 .../org/apache/samza/operators/StreamGraph.java |  2 +-
 .../org/apache/samza/operators/StreamSpec.java  | 46 -----------
 .../apache/samza/operators/StreamGraphImpl.java | 83 +++++++++-----------
 .../samza/example/KeyValueStoreExample.java     | 24 +-----
 .../samza/example/NoContextStreamExample.java   | 33 +-------
 .../samza/example/OrderShipmentJoinExample.java | 35 +--------
 .../samza/example/PageViewCounterExample.java   | 23 +-----
 .../samza/example/RepartitionExample.java       | 23 +-----
 .../samza/example/TestBroadcastExample.java     | 15 +---
 .../apache/samza/example/TestJoinExample.java   | 26 ++----
 .../apache/samza/example/TestWindowExample.java | 15 +---
 .../operators/impl/TestStreamOperatorImpl.java  |  1 +
 12 files changed, 69 insertions(+), 257 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e6147fda/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
index abc9861..30c4576 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
@@ -21,10 +21,10 @@ package org.apache.samza.operators;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.StreamSpec;
 
 import java.util.Map;
 
-
 /**
  * Job-level programming interface to create an operator DAG and run in various different runtime environments.
  */

http://git-wip-us.apache.org/repos/asf/samza/blob/e6147fda/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java
deleted file mode 100644
index c8a5e8d..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.system.SystemStream;
-
-import java.util.Properties;
-
-
-/**
- * This interface defines the specification of a {@link SystemStream}. It will be used by the {@link org.apache.samza.system.SystemAdmin}
- * to create a {@link SystemStream}
- */
-@InterfaceStability.Unstable
-public interface StreamSpec {
-  /**
-   * Get the {@link SystemStream}
-   *
-   * @return  {@link SystemStream} object
-   */
-  SystemStream getSystemStream();
-
-  /**
-   * Get the physical properties of the {@link SystemStream}
-   *
-   * @return  the properties of this stream
-   */
-  Properties getProperties();
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/e6147fda/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index dca3469..353f455 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -18,12 +18,12 @@
  */
 package org.apache.samza.operators;
 
-import java.util.Properties;
 import java.util.function.Function;
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
@@ -44,6 +44,9 @@ public class StreamGraphImpl implements StreamGraph {
    */
   private int opId = 0;
 
+  // TODO: SAMZA-1101: the instantiation of physical streams and the physical sink functions should be delayed
+  // after physical deployment. The input/output/intermediate stream creation should also be delegated to {@link ExecutionEnvironment}
+  // s.t. we can allow different physical instantiation of stream under different execution environment w/o code change.
   private class InputStreamImpl<K, V, M extends MessageEnvelope<K, V>> extends MessageStreamImpl<M> {
     final StreamSpec spec;
     final Serde<K> keySerde;
@@ -83,7 +86,7 @@ public class StreamGraphImpl implements StreamGraph {
         // TODO: need to find a way to directly pass in the serde class names
         // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(),
         //    message.getKey(), message.getKey(), message.getMessage()));
-        mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage()));
+        mc.send(new OutgoingMessageEnvelope(new SystemStream(this.spec.getSystemName(), this.spec.getPhysicalName()), message.getKey(), message.getMessage()));
       };
     }
   }
@@ -112,10 +115,10 @@ public class StreamGraphImpl implements StreamGraph {
         // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(),
         //    message.getKey(), message.getKey(), message.getMessage()));
         if (this.parKeyFn == null) {
-          mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage()));
+          mc.send(new OutgoingMessageEnvelope(new SystemStream(this.spec.getSystemName(), this.spec.getPhysicalName()), message.getKey(), message.getMessage()));
         } else {
           // apply partition key function
-          mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.parKeyFn.apply(message), message.getKey(), message.getMessage()));
+          mc.send(new OutgoingMessageEnvelope(new SystemStream(this.spec.getSystemName(), this.spec.getPhysicalName()), this.parKeyFn.apply(message), message.getKey(), message.getMessage()));
         }
       };
     }
@@ -124,17 +127,17 @@ public class StreamGraphImpl implements StreamGraph {
   /**
    * Maps keeping all {@link SystemStream}s that are input and output of operators in {@link StreamGraphImpl}
    */
-  private final Map<SystemStream, MessageStream> inStreams = new HashMap<>();
-  private final Map<SystemStream, OutputStream> outStreams = new HashMap<>();
+  private final Map<String, MessageStream> inStreams = new HashMap<>();
+  private final Map<String, OutputStream> outStreams = new HashMap<>();
 
   private ContextManager contextManager = new ContextManager() { };
 
   @Override
   public <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
-    if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
-      this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new InputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
+    if (!this.inStreams.containsKey(streamSpec.getId())) {
+      this.inStreams.putIfAbsent(streamSpec.getId(), new InputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
     }
-    return this.inStreams.get(streamSpec.getSystemStream());
+    return this.inStreams.get(streamSpec.getId());
   }
 
   /**
@@ -146,10 +149,10 @@ public class StreamGraphImpl implements StreamGraph {
    */
   @Override
   public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
-    if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
-      this.outStreams.putIfAbsent(streamSpec.getSystemStream(), new OutputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
+    if (!this.outStreams.containsKey(streamSpec.getId())) {
+      this.outStreams.putIfAbsent(streamSpec.getId(), new OutputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
     }
-    return this.outStreams.get(streamSpec.getSystemStream());
+    return this.outStreams.get(streamSpec.getId());
   }
 
   /**
@@ -161,12 +164,12 @@ public class StreamGraphImpl implements StreamGraph {
    */
   @Override
   public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createIntStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
-    if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
-      this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl<K, K, V, M>(this, streamSpec, keySerde, msgSerde));
+    if (!this.inStreams.containsKey(streamSpec.getId())) {
+      this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl<K, K, V, M>(this, streamSpec, keySerde, msgSerde));
     }
-    IntermediateStreamImpl<K, K, V, M> intStream = (IntermediateStreamImpl<K, K, V, M>) this.inStreams.get(streamSpec.getSystemStream());
-    if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
-      this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream);
+    IntermediateStreamImpl<K, K, V, M> intStream = (IntermediateStreamImpl<K, K, V, M>) this.inStreams.get(streamSpec.getId());
+    if (!this.outStreams.containsKey(streamSpec.getId())) {
+      this.outStreams.putIfAbsent(streamSpec.getId(), intStream);
     }
     return intStream;
   }
@@ -200,12 +203,15 @@ public class StreamGraphImpl implements StreamGraph {
   /**
    * Helper method to be get the input stream via {@link SystemStream}
    *
-   * @param systemStream  the {@link SystemStream}
+   * @param sstream  the {@link SystemStream}
    * @return  a {@link MessageStreamImpl} object corresponding to the {@code systemStream}
    */
-  public MessageStreamImpl getInputStream(SystemStream systemStream) {
-    if (this.inStreams.containsKey(systemStream)) {
-      return (MessageStreamImpl) this.inStreams.get(systemStream);
+  public MessageStreamImpl getInputStream(SystemStream sstream) {
+    for(MessageStream entry: this.inStreams.values()) {
+      if (((InputStreamImpl) entry).getSpec().getSystemName() == sstream.getSystem() &&
+          ((InputStreamImpl) entry).getSpec().getPhysicalName() == sstream.getStream()) {
+        return (MessageStreamImpl) entry;
+      }
     }
     return null;
   }
@@ -217,13 +223,6 @@ public class StreamGraphImpl implements StreamGraph {
     return null;
   }
 
-  <M> MessageStream<M> getIntStream(OutputStream<M> outStream) {
-    if (this.inStreams.containsValue(outStream)) {
-      return (MessageStream<M>) outStream;
-    }
-    return null;
-  }
-
   /**
    * Method to create intermediate topics for {@link MessageStreamImpl#partitionBy(Function)} method.
    *
@@ -234,27 +233,21 @@ public class StreamGraphImpl implements StreamGraph {
    */
   <PK, M> MessageStreamImpl<M> createIntStream(Function<M, PK> parKeyFn) {
     // TODO: placeholder to auto-generate intermediate streams via {@link StreamSpec}
-    StreamSpec streamSpec = new StreamSpec() {
-      @Override
-      public SystemStream getSystemStream() {
-        // TODO: should auto-generate intermedaite stream name here
-        return new SystemStream("intermediate", String.format("par-%d", StreamGraphImpl.this.opId));
-      }
+    StreamSpec streamSpec = this.createIntStreamSpec();
 
-      @Override
-      public Properties getProperties() {
-        return null;
-      }
-    };
-
-    if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
-      this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
+    if (!this.inStreams.containsKey(streamSpec.getId())) {
+      this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
     }
-    IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getSystemStream());
-    if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
-      this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream);
+    IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getId());
+    if (!this.outStreams.containsKey(streamSpec.getId())) {
+      this.outStreams.putIfAbsent(streamSpec.getId(), intStream);
     }
     return intStream;
   }
 
+  private StreamSpec createIntStreamSpec() {
+    // TODO: placeholder to generate the intermediate stream's {@link StreamSpec} automatically
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e6147fda/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
index 85ebc6c..4a0681e 100644
--- a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -32,12 +32,10 @@ import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.CommandLine;
 
-import java.util.Properties;
-
 
 /**
  * Example code using {@link KeyValueStore} to implement event-time window
@@ -113,25 +111,9 @@ public class KeyValueStoreExample implements StreamGraphBuilder {
     }
   }
 
-  StreamSpec input1 = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "PageViewEvent");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
+  StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
 
-  StreamSpec output = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "PageViewPerMember5min");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
+  StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka");
 
   class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
     String pageId;

http://git-wip-us.apache.org/repos/asf/samza/blob/e6147fda/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
index c6d2e6e..320680c 100644
--- a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
@@ -28,13 +28,12 @@ import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.CommandLine;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Properties;
 
 
 /**
@@ -42,35 +41,11 @@ import java.util.Properties;
  */
 public class NoContextStreamExample implements StreamGraphBuilder {
 
-  StreamSpec input1 = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "input1");
-    }
+  StreamSpec input1 = new StreamSpec("inputStreamA", "PageViewEvent", "kafka");
 
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
+  StreamSpec input2 = new StreamSpec("inputStreamB", "RumLixEvent", "kafka");
 
-  StreamSpec input2 = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "input2");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
-
-  StreamSpec output = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "output");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
+  StreamSpec output = new StreamSpec("joinedPageViewStream", "PageViewJoinRumLix", "kafka");
 
   class MessageType {
     String joinKey;

http://git-wip-us.apache.org/repos/asf/samza/blob/e6147fda/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
index 0477066..30ce7d2 100644
--- a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -23,17 +23,14 @@ import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraphBuilder;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.CommandLine;
 
-import java.util.Properties;
-
 
 /**
  * Simple 2-way stream-to-stream join example
@@ -71,35 +68,11 @@ public class OrderShipmentJoinExample implements StreamGraphBuilder {
     standaloneEnv.run(new OrderShipmentJoinExample(), config);
   }
 
-  StreamSpec input1 = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "Orders");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
-
-  StreamSpec input2 = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "Shipment");
-    }
+  StreamSpec input1 = new StreamSpec("orderStream", "OrderEvent", "kafka");
 
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
+  StreamSpec input2 = new StreamSpec("shipmentStream", "ShipmentEvent", "kafka");
 
-  StreamSpec output = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "FulfilledOrders");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
+  StreamSpec output = new StreamSpec("joinedOrderShipmentStream", "OrderShipmentJoinEvent", "kafka");
 
   class OrderRecord implements MessageEnvelope<String, OrderRecord> {
     String orderId;

http://git-wip-us.apache.org/repos/asf/samza/blob/e6147fda/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
index f7d8bda..fcf67a7 100644
--- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -29,11 +29,10 @@ import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
-import java.util.Properties;
 
 
 /**
@@ -62,25 +61,9 @@ public class PageViewCounterExample implements StreamGraphBuilder {
     standaloneEnv.run(new PageViewCounterExample(), config);
   }
 
-  StreamSpec input1 = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "PageViewEvent");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
+  StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
 
-  StreamSpec output = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "PageViewPerMember5min");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
+  StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka");
 
   class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
     String pageId;

http://git-wip-us.apache.org/repos/asf/samza/blob/e6147fda/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
index 6994ac4..228668c 100644
--- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -27,11 +27,10 @@ import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.ExecutionEnvironment;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
-import java.util.*;
 
 
 /**
@@ -73,25 +72,9 @@ public class RepartitionExample implements StreamGraphBuilder {
     standaloneEnv.run(new RepartitionExample(), config);
   }
 
-  StreamSpec input1 = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "PageViewEvent");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
+  StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
 
-  StreamSpec output = new StreamSpec() {
-    @Override public SystemStream getSystemStream() {
-      return new SystemStream("kafka", "PageViewPerMember5min");
-    }
-
-    @Override public Properties getProperties() {
-      return null;
-    }
-  };
+  StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka");
 
   class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
     String pageId;

http://git-wip-us.apache.org/repos/asf/samza/blob/e6147fda/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
index d22324b..059afce 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
@@ -22,18 +22,16 @@ package org.apache.samza.example;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
 import org.apache.samza.operators.data.InputMessageEnvelope;
 import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
 import org.apache.samza.operators.data.Offset;
 import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStreamPartition;
 
 import java.time.Duration;
 import java.util.function.BiFunction;
-import java.util.Properties;
 import java.util.Set;
 
 
@@ -70,15 +68,8 @@ public class TestBroadcastExample extends TestExampleBase {
   public void init(StreamGraph graph, Config config) {
     BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1;
     inputs.keySet().forEach(entry -> {
-        MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
-          @Override public SystemStream getSystemStream() {
-            return entry;
-          }
-
-          @Override public Properties getProperties() {
-            return null;
-          }
-        }, null, null).map(this::getInputMessage);
+        MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(
+                new StreamSpec(entry.toString(), entry.getStream(), entry.getSystem()), null, null).map(this::getInputMessage);
 
         inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
             .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));

http://git-wip-us.apache.org/repos/asf/samza/blob/e6147fda/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
index fe6e7e7..cc53814 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
@@ -22,19 +22,18 @@ package org.apache.samza.example;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
 import org.apache.samza.operators.data.InputMessageEnvelope;
 import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
 import org.apache.samza.operators.data.Offset;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Properties;
 import java.util.Set;
 
 
@@ -65,16 +64,9 @@ public class TestJoinExample  extends TestExampleBase {
   public void init(StreamGraph graph, Config config) {
 
     for (SystemStream input : inputs.keySet()) {
+      StreamSpec inputStreamSpec = new StreamSpec(input.toString(), input.getStream(), input.getSystem());
       MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream(
-          new StreamSpec() {
-            @Override public SystemStream getSystemStream() {
-              return input;
-            }
-
-            @Override public Properties getProperties() {
-              return null;
-            }
-          }, null, null).map(this::getInputMessage);
+          inputStreamSpec, null, null).map(this::getInputMessage);
       if (joinOutput == null) {
         joinOutput = newSource;
       } else {
@@ -82,15 +74,9 @@ public class TestJoinExample  extends TestExampleBase {
       }
     }
 
-    joinOutput.sendTo(graph.createOutStream(new StreamSpec() {
-      @Override public SystemStream getSystemStream() {
-        return null;
-      }
-
-      @Override public Properties getProperties() {
-        return null;
-      }
-    }, new StringSerde("UTF-8"), new JsonSerde<>()));
+    joinOutput.sendTo(graph.createOutStream(
+            new StreamSpec("joinOutput", "JoinOutputEvent", "kafka"),
+            new StringSerde("UTF-8"), new JsonSerde<>()));
 
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/e6147fda/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
index e08ca20..73f4674 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
@@ -21,18 +21,16 @@ package org.apache.samza.example;
 
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
 import org.apache.samza.operators.data.InputMessageEnvelope;
 import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.data.Offset;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStreamPartition;
 
 import java.time.Duration;
 import java.util.function.BiFunction;
-import java.util.Properties;
 import java.util.Set;
 
 
@@ -60,15 +58,8 @@ public class TestWindowExample extends TestExampleBase {
   @Override
   public void init(StreamGraph graph, Config config) {
     BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1;
-    inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
-      @Override public SystemStream getSystemStream() {
-        return source;
-      }
-
-      @Override public Properties getProperties() {
-        return null;
-      }
-    }, null, null).
+    inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(
+            new StreamSpec(source.toString(), source.getStream(), source.getSystem()), null, null).
         map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(),
             m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator)));
 

http://git-wip-us.apache.org/repos/asf/samza/blob/e6147fda/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
index 010a210..0a873fd 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.*;
 public class TestStreamOperatorImpl {
 
   @Test
+  @SuppressWarnings("unchecked")
   public void testSimpleOperator() {
     StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
     FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);


[6/9] samza git commit: SAMZA-1103: ZkBarrier

Posted by jm...@apache.org.
SAMZA-1103: ZkBarrier

SAMZA-1103: Barrier for JobModel upgrades. When all the processors got notification about the new JobModel, only after that they can start using the new model.

Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Author: Boris Shkolnik <bo...@apache.org>
Author: navina <na...@apache.org>

Reviewers: Fred Ji <fr...@yahoo.com>, Navina Ramesh <na...@apache.org>, Xiliu Liu <xi...@linkedin.com>

Closes #61 from sborya/ZkBarrier


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4d7b3b35
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4d7b3b35
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4d7b3b35

Branch: refs/heads/samza-fluent-api-v1
Commit: 4d7b3b3534ed804ad54227901bf3bbaff32814e1
Parents: c58d74b
Author: Boris Shkolnik <bo...@apache.org>
Authored: Tue Feb 28 17:56:50 2017 -0800
Committer: navina <na...@apache.org>
Committed: Tue Feb 28 17:56:50 2017 -0800

----------------------------------------------------------------------
 .../samza/zk/BarrierForVersionUpgrade.java      |  46 +++++
 .../samza/zk/ScheduleAfterDebounceTime.java     |   8 +-
 .../samza/zk/ZkBarrierForVersionUpgrade.java    | 166 +++++++++++++++++++
 .../java/org/apache/samza/zk/ZkKeyBuilder.java  |   4 +-
 .../main/java/org/apache/samza/zk/ZkUtils.java  |  11 +-
 .../apache/samza/task/ReadableCoordinator.scala |   1 +
 .../zk/TestZkBarrierForVersionUpgrade.java      | 148 +++++++++++++++++
 .../apache/samza/zk/TestZkLeaderElector.java    |  11 +-
 8 files changed, 379 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
new file mode 100644
index 0000000..b2d80d0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import java.util.List;
+
+
+/**
+ * Interface for a barrier - to allow synchronization between different processors to switch to a newly published
+ * JobModel.
+ */
+public interface BarrierForVersionUpgrade {
+  /**
+   * Barrier is usually started by the leader.
+   * @param version - for which the barrier is started.
+   * @param processorsNames - list of processors available at the time of the JobModel generation.
+   */
+  void startBarrier(String version,  List<String> processorsNames);
+
+  /**
+   * Called by the processor.
+   * Updates the processor readiness to use the new version and wait on the barrier, until all other processors
+   * joined.
+   * @param version of the jobModel this barrier is protecting.
+   * @param processorsName as it appears in the list of processors.
+   * @param callback  will be invoked, when barrier is reached.
+   */
+  void waitForBarrier(String version, String processorsName, Runnable callback);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
index 0a4db6d..289d900 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -42,15 +42,17 @@ public class ScheduleAfterDebounceTime {
   public static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
   public static final long TIMEOUT_MS = 1000 * 10; // timeout to wait for a task to complete
 
+  // Names of actions.
+  // When the same action is scheduled it needs to cancel the previous one.
+  // To accomplish that we keep the previous future in a map, keyed by the action name.
+
+  // Here we predefine some actions which are used in the ZK based standalone app.
   // Action name when the JobModel version changes
   public static final String JOB_MODEL_VERSION_CHANGE = "JobModelVersionChange";
 
   // Action name when the Processor membership changes
   public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange";
 
-  // Action name when the Processor Data changes
-  public static final String ON_DATA_CHANGE_ON = "OnDataChangeOn";
-
   public static final int DEBOUNCE_TIME_MS = 2000;
 
   private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(

http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
new file mode 100644
index 0000000..3ec87b0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import java.util.List;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
+  private final ZkUtils zkUtils;
+  private final ZkKeyBuilder keyBuilder;
+  private final static String BARRIER_DONE = "done";
+  private final static Logger LOG = LoggerFactory.getLogger(ZkBarrierForVersionUpgrade.class);
+
+  private final ScheduleAfterDebounceTime debounceTimer;
+
+  private final String barrierPrefix;
+
+  public ZkBarrierForVersionUpgrade(ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer) {
+    this.zkUtils = zkUtils;
+    keyBuilder = zkUtils.getKeyBuilder();
+
+    barrierPrefix = keyBuilder.getJobModelVersionBarrierPrefix();
+    this.debounceTimer = debounceTimer;
+  }
+
+  @Override
+  public void startBarrier(String version, List<String> processorsNames) {
+    String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
+    String barrierDonePath = String.format("%s/barrier_done", barrierPath);
+    String barrierProcessors = String.format("%s/barrier_processors", barrierPath);
+
+    zkUtils.makeSurePersistentPathsExists(new String[]{barrierPrefix, barrierPath, barrierProcessors, barrierDonePath});
+
+    // callback for when the barrier is reached
+    Runnable callback = new Runnable() {
+      @Override
+      public void run() {
+        LOG.info("Writing BARRIER DONE to " + barrierDonePath);
+        zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_DONE);
+      }
+    };
+    // subscribe for processor's list changes
+    LOG.info("Subscribing for child changes at " + barrierProcessors);
+    zkUtils.getZkClient().subscribeChildChanges(barrierProcessors,
+        new ZkBarrierChangeHandler(callback, processorsNames));
+  }
+
+  @Override
+  public void waitForBarrier(String version, String processorsName, Runnable callback) {
+    // if participant makes this call it means it has already stopped the old container and got the new job model.
+    String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
+    String barrierDonePath = String.format("%s/barrier_done", barrierPath);
+    String barrierProcessors = String.format("%s/barrier_processors", barrierPath);
+    String barrierProcessorThis = String.format("%s/%s", barrierProcessors, processorsName);
+
+
+    // update the barrier for this processor
+    LOG.info("Creating a child for barrier at " + barrierProcessorThis);
+    zkUtils.getZkClient().createPersistent(barrierProcessorThis);
+
+    // now subscribe for the barrier
+    zkUtils.getZkClient().subscribeDataChanges(barrierDonePath, new ZkBarrierReachedHandler(barrierDonePath, debounceTimer, callback));
+  }
+
+  /**
+   * listener for the subscription.
+   */
+  class ZkBarrierChangeHandler implements IZkChildListener {
+    Runnable callback;
+    List<String> names;
+
+    public ZkBarrierChangeHandler(Runnable callback, List<String> names) {
+      this.callback = callback;
+      this.names = names;
+    }
+
+    @Override
+    public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {
+      // Find out the event & Log
+      boolean allIn = true;
+
+      if (currentChildren == null) {
+        LOG.info("Got handleChildChange with null currentChildren");
+        return;
+      }
+      // debug
+      StringBuilder sb = new StringBuilder();
+      for (String child : currentChildren) {
+        sb.append(child).append(",");
+      }
+      LOG.info("list of children in the barrier = " + parentPath + ":" + sb.toString());
+      sb = new StringBuilder();
+      for (String child : names) {
+        sb.append(child).append(",");
+      }
+      LOG.info("list of children to compare against = " + parentPath + ":" + sb.toString());
+
+
+      // check if all the names are in
+      for (String n : names) {
+        if (!currentChildren.contains(n)) {
+          LOG.info("node " + n + " is still not in the list ");
+          allIn = false;
+          break;
+        }
+      }
+      if (allIn) {
+        LOG.info("ALl nodes reached the barrier");
+        callback.run(); // all the names have registered
+      }
+    }
+  }
+
+  class ZkBarrierReachedHandler implements IZkDataListener {
+    private final ScheduleAfterDebounceTime debounceTimer;
+    private final String barrierPathDone;
+    private final Runnable callback;
+
+    public ZkBarrierReachedHandler(String barrierPathDone, ScheduleAfterDebounceTime debounceTimer, Runnable callback) {
+      this.barrierPathDone = barrierPathDone;
+      this.callback = callback;
+      this.debounceTimer = debounceTimer;
+    }
+
+    @Override
+    public void handleDataChange(String dataPath, Object data)
+        throws Exception {
+      String done = (String) data;
+      LOG.info("got notification about barrier path=" + barrierPathDone + "; done=" + done);
+      if (done.equals(BARRIER_DONE)) {
+        zkUtils.unsubscribeDataChanges(barrierPathDone, this);
+        debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, callback);
+      }
+      // we do not need to resubscribe because, ZkClient library does it for us.
+
+    }
+
+    @Override
+    public void handleDataDeleted(String dataPath)
+        throws Exception {
+      LOG.warn("barrier done got deleted at " + dataPath);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
index d6cb9f3..0a8f37e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
@@ -88,5 +88,7 @@ public class ZkKeyBuilder {
     return String.format("%s/%s", getJobModelPathPrefix(), jobModelVersion);
   }
 
-
+  public String getJobModelVersionBarrierPrefix() {
+    return String.format("/%s/versionBarriers", pathPrefix);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index b11e02f..320cd49 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -19,6 +19,9 @@
 
 package org.apache.samza.zk;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
@@ -27,10 +30,6 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Util class to help manage Zk connection and ZkClient.
  * It also provides additional utility methods for read/write/subscribe/unsubscribe access to the ZK tree.
@@ -165,7 +164,7 @@ public class ZkUtils {
 
   /**
    * verify that given paths exist in ZK
-   * @param paths
+   * @param paths - paths to verify or create
    */
   public void makeSurePersistentPathsExists(String[] paths) {
     for (String path : paths) {
@@ -177,7 +176,7 @@ public class ZkUtils {
 
   /**
    * subscribe to the changes in the list of processors in ZK
-   * @param listener
+   * @param listener - will be called when a processor is added or removed.
    */
   public void subscribeToProcessorChange(IZkChildListener listener) {
     LOG.info("pid=" + processorId + " subscribing for child change at:" + keyBuilder.getProcessorsPath());

http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
index 6e1134d..6c7641b 100644
--- a/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
@@ -38,3 +38,4 @@ class ReadableCoordinator(val taskName: TaskName) extends TaskCoordinator {
   def requestedShutdownOnConsensus = shutdownRequest.isDefined && shutdownRequest.get == RequestScope.CURRENT_TASK
   def requestedShutdownNow         = shutdownRequest.isDefined && shutdownRequest.get == RequestScope.ALL_TASKS_IN_CONTAINER
 }
+

http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
new file mode 100644
index 0000000..92cb2c9
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.util.ArrayList;
+import java.util.List;
+import junit.framework.Assert;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestZkBarrierForVersionUpgrade {
+  private static EmbeddedZookeeper zkServer = null;
+  private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
+  private String testZkConnectionString = null;
+  private ZkUtils testZkUtils = null;
+  private static final int SESSION_TIMEOUT_MS = 20000;
+  private static final int CONNECTION_TIMEOUT_MS = 10000;
+
+  @BeforeClass
+  public static void setup() throws InterruptedException {
+    zkServer = new EmbeddedZookeeper();
+    zkServer.setup();
+  }
+
+  @Before
+  public void testSetup() {
+    testZkConnectionString = "localhost:" + zkServer.getPort();
+    try {
+      testZkUtils = getZkUtilsWithNewClient();
+    } catch (Exception e) {
+      Assert.fail("Client connection setup failed. Aborting tests..");
+    }
+  }
+
+  @After
+  public void testTeardown() {
+    testZkUtils.deleteRoot();
+    testZkUtils.close();
+    testZkUtils = null;
+  }
+
+  @AfterClass
+  public static void teardown() {
+    zkServer.teardown();
+  }
+
+  @Test
+  public void testZkBarrierForVersionUpgrade() {
+    ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+    ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer);
+    String ver = "1";
+    List<String> processors = new ArrayList<String>();
+    processors.add("p1");
+    processors.add("p2");
+
+    class Status {
+      boolean p1 = false;
+      boolean p2 = false;
+    }
+    final Status s = new Status();
+
+    barrier.startBarrier(ver, processors);
+
+    barrier.waitForBarrier(ver, "p1", new Runnable() {
+      @Override
+      public void run() {
+        s.p1 = true;
+      }
+    });
+
+    barrier.waitForBarrier(ver, "p2", new Runnable() {
+      @Override
+      public void run() {
+        s.p2 = true;
+      }
+    });
+
+    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2, 2, 100));
+  }
+
+  @Test
+  public void testNegativeZkBarrierForVersionUpgrade() {
+    ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+    ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer);
+    String ver = "1";
+    List<String> processors = new ArrayList<String>();
+    processors.add("p1");
+    processors.add("p2");
+    processors.add("p3");
+
+    class Status {
+      boolean p1 = false;
+      boolean p2 = false;
+      boolean p3 = false;
+    }
+    final Status s = new Status();
+
+    barrier.startBarrier(ver, processors);
+
+    barrier.waitForBarrier(ver, "p1", new Runnable() {
+      @Override
+      public void run() {
+        s.p1 = true;
+      }
+    });
+
+    barrier.waitForBarrier(ver, "p2", new Runnable() {
+      @Override
+      public void run() {
+        s.p2 = true;
+      }
+    });
+
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 100));
+
+  }
+
+
+  private ZkUtils getZkUtilsWithNewClient() {
+    ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
+    return new ZkUtils(
+        KEY_BUILDER,
+        ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS),
+        CONNECTION_TIMEOUT_MS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index 6342fde..bfda464 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -18,6 +18,11 @@
  */
 package org.apache.samza.zk;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
@@ -30,12 +35,6 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;


[8/9] samza git commit: SAMZA-1096: StreamSpec constructors in the ExecutionEnvironments

Posted by jm...@apache.org.
SAMZA-1096: StreamSpec constructors in the ExecutionEnvironments

Author: Jacob Maes <jm...@linkedin.com>

Reviewers: Yi Pan (Data Infrastructure) <ni...@gmail.com>,Xinyu Liu <xi...@linkedin.com>,Navina Ramesh <na...@apache.org>

Closes #74 from jmakes/samza-1096


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e6c1eed4
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e6c1eed4
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e6c1eed4

Branch: refs/heads/samza-fluent-api-v1
Commit: e6c1eed4f1d576661abafce8477c1749c2554b39
Parents: d104013
Author: Jacob Maes <jm...@linkedin.com>
Authored: Mon Mar 6 14:52:18 2017 -0800
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Mon Mar 6 14:52:18 2017 -0800

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     | 225 +++++++++----
 .../documentation/versioned/jobs/logging.md     |   5 +
 .../samza/system/ExecutionEnvironment.java      |  32 +-
 .../org/apache/samza/system/StreamSpec.java     |  15 +-
 .../system/AbstractExecutionEnvironment.java    |  88 +++++
 .../system/RemoteExecutionEnvironment.java      |   6 +-
 .../system/StandaloneExecutionEnvironment.java  |   6 +-
 .../org/apache/samza/config/JobConfig.scala     |   3 +
 .../org/apache/samza/config/StreamConfig.scala  | 179 ++++++++--
 .../samza/example/TestBroadcastExample.java     |   9 +-
 .../apache/samza/example/TestJoinExample.java   |   2 +-
 .../apache/samza/example/TestWindowExample.java |   2 +-
 .../TestAbstractExecutionEnvironment.java       | 331 +++++++++++++++++++
 .../org/apache/samza/config/KafkaConfig.scala   |   8 +-
 .../apache/samza/config/Log4jSystemConfig.java  |  12 +-
 15 files changed, 810 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index a26bc43..ba04139 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -156,13 +156,20 @@
                         others' checkpoints, and perhaps interfere with each other in other ways.
                     </td>
                 </tr>
-                <tr>
                     <td class="property" id="job-coordinator-system">job.coordinator.system</td>
                     <td class="default"></td>
                     <td class="description">
                         <strong>Required:</strong> The <span class="system">system-name</span> to use for creating and maintaining the <a href="../container/coordinator-stream.html">Coordinator Stream</a>.
                     </td>
                 </tr>
+                <tr>
+                    <td class="property" id="job-default-system">job.default.system</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        The <span class="system">system-name</span> to access any input or output streams for which the system is not explicitly configured.
+                        This property is for input and output streams whereas job.coordinator.system is for samza metadata streams.</a>.
+                    </td>
+                </tr>
 
                 <tr>
                     <td class="property" id="job-coordinator-replication-factor">job.coordinator.<br />replication.factor</td>
@@ -254,7 +261,7 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="job-systemstreampartition-matcher-class">job.systemstreampartition.matcher.class</td>
+                    <td class="property" id="job-systemstreampartition-matcher-class">job.systemstreampartition.<br>matcher.class</td>
                     <td class="default"></td>
                     <td class="description">
                         If you want to enable static partition assignment, then this is a <strong>required</strong> configuration.
@@ -286,7 +293,7 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="job_systemstreampartition_matcher_config_range">job.systemstreampartition.matcher.config.range</td>
+                    <td class="property" id="job_systemstreampartition_matcher_config_range">job.systemstreampartition.<br>matcher.config.range</td>
                     <td class="default"></td>
                     <td class="description">
                         If <code>job.systemstreampartition.matcher.class</code> is specified, and the value of this property is
@@ -305,7 +312,7 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="job_systemstreampartition_matcher_config_regex">job.systemstreampartition.matcher.config.regex</td>
+                    <td class="property" id="job_systemstreampartition_matcher_config_regex">job.systemstreampartition.<br>matcher.config.regex</td>
                     <td class="default"></td>
                     <td class="description">
                         If <code>job.systemstreampartition.matcher.class</code> is specified, and the value of this property is
@@ -316,8 +323,7 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="job_systemstreampartition_matcher_co
-                    nfig_job_factory_regex">job.systemstreampartition.matcher.config.job.factory.regex</td>
+                    <td class="property" id="job_systemstreampartition_matcher_config_job_factory_regex">job.systemstreampartition.<br>matcher.config.job.factory.regex</td>
                     <td class="default"></td>
                     <td class="description">
                         This configuration can be used to specify the Java supported regex to match the <code>StreamJobFactory</code>
@@ -683,7 +689,7 @@
                 </tr>
 
                 <tr>
-                    <th colspan="3" class="section" id="streams"><a href="../container/streams.html">Systems (input and output streams)</a></th>
+                    <th colspan="3" class="section" id="systems">Systems</th>
                 </tr>
 
                 <tr>
@@ -716,11 +722,12 @@
                 <tr>
                     <td class="property" id="systems-samza-key-serde">systems.<span class="system">system-name</span>.<br>samza.key.serde</td>
                     <td class="default" rowspan="2"></td>
-                    <td class="description" rowspan="2">
+                    <td class="description">
                         The <a href="../container/serialization.html">serde</a> which will be used to deserialize the
                         <em>key</em> of messages on input streams, and to serialize the <em>key</em> of messages on
-                        output streams. This property can be defined either for an individual stream, or for all
-                        streams within a system (if both are defined, the stream-level definition takes precedence).
+                        output streams. This property defines the serde for an for all streams in the system. See the
+                        <a href="#streams-samza-key-serde">stream-scoped property</a> to define the serde for an
+                        individual stream. If both are defined, the stream-level definition takes precedence.
                         The value of this property must be a <span class="serde">serde-name</span> that is registered
                         with <a href="#serializers-registry-class" class="property">serializers.registry.*.class</a>.
                         If this property is not set, messages are passed unmodified between the input stream consumer,
@@ -729,16 +736,21 @@
                 </tr>
                 <tr>
                     <td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.key.serde</td>
+                    <td class="description">
+                        This is deprecated in favor of <a href="#streams-samza-key-serde" class="property">
+                        streams.<span class="stream">stream-id</span>.samza.key.serde</a>.
+                    </td>
                 </tr>
 
                 <tr>
                     <td class="property" id="systems-samza-msg-serde">systems.<span class="system">system-name</span>.<br>samza.msg.serde</td>
                     <td class="default" rowspan="2"></td>
-                    <td class="description" rowspan="2">
+                    <td class="description">
                         The <a href="../container/serialization.html">serde</a> which will be used to deserialize the
                         <em>value</em> of messages on input streams, and to serialize the <em>value</em> of messages on
-                        output streams. This property can be defined either for an individual stream, or for all
-                        streams within a system (if both are defined, the stream-level definition takes precedence).
+                        output streams. This property defines the serde for an for all streams in the system. See the
+                        <a href="#streams-samza-msg-serde">stream-scoped property</a> to define the serde for an
+                        individual stream. If both are defined, the stream-level definition takes precedence.
                         The value of this property must be a <span class="serde">serde-name</span> that is registered
                         with <a href="#serializers-registry-class" class="property">serializers.registry.*.class</a>.
                         If this property is not set, messages are passed unmodified between the input stream consumer,
@@ -747,12 +759,16 @@
                 </tr>
                 <tr>
                     <td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.msg.serde</td>
+                    <td class="description">
+                        This is deprecated in favor of <a href="#streams-samza-msg-serde" class="property">
+                        streams.<span class="stream">stream-id</span>.samza.msg.serde</a>.
+                    </td>
                 </tr>
 
                 <tr>
                     <td class="property" id="systems-samza-offset-default">systems.<span class="system">system-name</span>.<br>samza.offset.default</td>
                     <td class="default" rowspan="2">upcoming</td>
-                    <td class="description" rowspan="2">
+                    <td class="description">
                         If a container starts up without a <a href="../container/checkpointing.html">checkpoint</a>,
                         this property determines where in the input stream we should start consuming. The value must be an
                         <a href="../api/javadocs/org/apache/samza/system/SystemStreamMetadata.OffsetType.html">OffsetType</a>,
@@ -765,18 +781,113 @@
                             <dd>Start processing at the oldest available message in the system, and
                                 <a href="reprocessing.html">reprocess</a> the entire available message history.</dd>
                         </dl>
-                        This property can be defined either for an individual stream, or for all streams within a system
-                        (if both are defined, the stream-level definition takes precedence).
+                        This property is for all streams within a system. To set it for an individual stream, see
+                        <a href="#streams-samza-offset-default">streams.<span class="stream">stream-id</span>.<br>samza.offset.default</a>
+                        If both are defined, the stream-level definition takes precedence.
                     </td>
                 </tr>
                 <tr>
                     <td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.offset.default</td>
+                    <td class="description">
+                        This is deprecated in favor of <a href="#streams-samza-offset-default" class="property">
+                        streams.<span class="stream">stream-id</span>.samza.offset.default</a>.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="task-consumer-batch-size">task.consumer.batch.size</td>
+                    <td class="default">1</td>
+                    <td class="description">
+                        If set to a positive integer, the task will try to consume
+                        <a href="../container/streams.html#batching">batches</a> with the given number of messages
+                        from each input stream, rather than consuming round-robin from all the input streams on
+                        each individual message. Setting this property can improve performance in some cases.
+                    </td>
+                </tr>
+
+                <tr>
+                    <th colspan="3" class="section" id="streams"><a href="../container/streams.html">Streams</a></th>
+                </tr>
+
+                <tr>
+                    <td class="property" id="streams-system">streams.<span class="stream">stream-id</span>.<br>samza.system</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        The <span class="system">system-name</span> of the system on which this stream will be accessed.
+                        This property binds the stream to one of the systems defined with the property
+                        systems.<span class="system">system-name</span>.samza.factory. <br>
+                        If this property isn't specified, it is inherited from job.default.system.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="streams-physical-name">streams.<span class="stream">stream-id</span>.<br>samza.physical.name</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        The physical name of the stream on the system on which this stream will be accessed.
+                        This is opposed to the stream-id which is the logical name that Samza uses to identify the stream.
+                        A physical name could be a Kafka topic name, an HDFS file URN or any other system-specific identifier.
+                    </td>
                 </tr>
 
                 <tr>
-                    <td class="property" id="systems-streams-samza-reset-offset">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.reset.offset</td>
-                    <td>false</td>
-                    <td>
+                    <td class="property" id="streams-samza-key-serde">streams.<span class="stream">stream-id</span>.<br>samza.key.serde</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        The <a href="../container/serialization.html">serde</a> which will be used to deserialize the
+                        <em>key</em> of messages on input streams, and to serialize the <em>key</em> of messages on
+                        output streams. This property defines the serde for an individual stream. See the
+                        <a href="#systems-samza-key-serde">system-scoped property</a> to define the serde for all
+                        streams within a system. If both are defined, the stream-level definition takes precedence.
+                        The value of this property must be a <span class="serde">serde-name</span> that is registered
+                        with <a href="#serializers-registry-class" class="property">serializers.registry.*.class</a>.
+                        If this property is not set, messages are passed unmodified between the input stream consumer,
+                        the task and the output stream producer.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="streams-samza-msg-serde">streams.<span class="stream">stream-id</span>.<br>samza.msg.serde</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        The <a href="../container/serialization.html">serde</a> which will be used to deserialize the
+                        <em>value</em> of messages on input streams, and to serialize the <em>value</em> of messages on
+                        output streams. This property defines the serde for an individual stream. See the
+                        <a href="#systems-samza-msg-serde">system-scoped property</a> to define the serde for all
+                        streams within a system. If both are defined, the stream-level definition takes precedence.
+                        The value of this property must be a <span class="serde">serde-name</span> that is registered
+                        with <a href="#serializers-registry-class" class="property">serializers.registry.*.class</a>.
+                        If this property is not set, messages are passed unmodified between the input stream consumer,
+                        the task and the output stream producer.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="streams-samza-offset-default">streams.<span class="stream">stream-id</span>.<br>samza.offset.default</td>
+                    <td class="default">upcoming</td>
+                    <td class="description">
+                        If a container starts up without a <a href="../container/checkpointing.html">checkpoint</a>,
+                        this property determines where in the input stream we should start consuming. The value must be an
+                        <a href="../api/javadocs/org/apache/samza/system/SystemStreamMetadata.OffsetType.html">OffsetType</a>,
+                        one of the following:
+                        <dl>
+                            <dt><code>upcoming</code></dt>
+                            <dd>Start processing messages that are published after the job starts. Any messages published while
+                                the job was not running are not processed.</dd>
+                            <dt><code>oldest</code></dt>
+                            <dd>Start processing at the oldest available message in the system, and
+                                <a href="reprocessing.html">reprocess</a> the entire available message history.</dd>
+                        </dl>
+                        This property is for an individual stream. To set it for all streams within a system, see
+                        <a href="#systems-samza-offset-default">systems.<span class="system">system-name</span>.<br>samza.offset.default</a>
+                        If both are defined, the stream-level definition takes precedence.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="streams-streams-samza-reset-offset">streams.<span class="stream">stream-id</span>.<br>samza.reset.offset</td>
+                    <td class="default">false</td>
+                    <td class="description">
                         If set to <code>true</code>, when a Samza container starts up, it ignores any
                         <a href="../container/checkpointing.html">checkpointed offset</a> for this particular input
                         stream. Its behavior is thus determined by the <code>samza.offset.default</code> setting.
@@ -787,9 +898,9 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="systems-streams-samza-priority">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.priority</td>
-                    <td>-1</td>
-                    <td>
+                    <td class="property" id="streams-streams-samza-priority">streams.<span class="stream">stream-id</span>.<br>samza.priority</td>
+                    <td class="default">-1</td>
+                    <td class="description">
                         If one or more streams have a priority set (any positive integer), they will be processed
                         with <a href="../container/streams.html#prioritizing-input-streams">higher priority</a> than the other streams.
                         You can set several streams to the same priority, or define multiple priority levels by
@@ -800,9 +911,9 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="systems-streams-samza-bootstrap">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.bootstrap</td>
-                    <td>false</td>
-                    <td>
+                    <td class="property" id="streams-streams-samza-bootstrap">streams.<span class="stream">stream-id</span>.<br>samza.bootstrap</td>
+                    <td class="default">false</td>
+                    <td class="description">
                         If set to <code>true</code>, this stream will be processed as a
                         <a href="../container/streams.html#bootstrapping">bootstrap stream</a>. This means that every time
                         a Samza container starts up, this stream will be fully consumed before messages from any
@@ -811,13 +922,12 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="task-consumer-batch-size">task.consumer.batch.size</td>
-                    <td>1</td>
-                    <td>
-                        If set to a positive integer, the task will try to consume
-                        <a href="../container/streams.html#batching">batches</a> with the given number of messages
-                        from each input stream, rather than consuming round-robin from all the input streams on
-                        each individual message. Setting this property can improve performance in some cases.
+                    <td class="property" id="streams-properties">streams.<span class="stream">stream-id</span>.*</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        Any properties of the stream. These are typically system-specific and can be used by the system
+                        for stream creation or validation. Note that the other properties are prefixed with <em>samza.</em>
+                        which distinguishes them as Samza properties that are not system-specific.
                     </td>
                 </tr>
 
@@ -835,6 +945,8 @@
                         <span class="serde">serde-name</span> you want, and reference that name in properties like
                         <a href="#systems-samza-key-serde" class="property">systems.*.samza.key.serde</a>,
                         <a href="#systems-samza-msg-serde" class="property">systems.*.samza.msg.serde</a>,
+                        <a href="#streams-samza-key-serde" class="property">streams.*.samza.key.serde</a>,
+                        <a href="#streams-samza-msg-serde" class="property">streams.*.samza.msg.serde</a>,
                         <a href="#stores-key-serde" class="property">stores.*.key.serde</a> and
                         <a href="#stores-msg-serde" class="property">stores.*.msg.serde</a>.
                         The value of this property is the fully-qualified name of a Java class that implements
@@ -1445,7 +1557,7 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="cluster-manager-container-memory-mb">cluster-manager.container.memory.mb</td>
+                    <td class="property" id="cluster-manager-container-memory-mb">cluster-manager.container.<br>memory.mb</td>
                     <td class="default">1024</td>
                     <td class="description">
                         How much memory, in megabytes, to request from the cluster manager per container of your job. Along with
@@ -1461,7 +1573,7 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="cluster-manager-container-cpu-cores">cluster-manager.container.cpu.cores</td>
+                    <td class="property" id="cluster-manager-container-cpu-cores">cluster-manager.container.<br>cpu.cores</td>
                     <td class="default">1</td>
                     <td class="description">
                         The number of CPU cores to request per container of your job. Each node in the
@@ -1501,7 +1613,7 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="cluster-manager-jmx-enabled">cluster-manager.jobcoordinator.jmx.enabled</td>
+                    <td class="property" id="cluster-manager-jmx-enabled">cluster-manager.jobcoordinator.<br>jmx.enabled</td>
                     <td class="default">true</td>
                     <td class="description">
                         Determines whether a JMX server should be started on the job's JobCoordinator.
@@ -1510,7 +1622,7 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="cluster-manager-allocator-sleep-ms">cluster-manager.allocator.sleep.ms</td>
+                    <td class="property" id="cluster-manager-allocator-sleep-ms">cluster-manager.allocator.<br>sleep.ms</td>
                     <td class="default">3600</td>
                     <td class="description">
                         The container allocator thread is responsible for matching requests to allocated containers.
@@ -1519,7 +1631,7 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="cluster-manager-container-request-timeout-ms">cluster-manager.container.request.timeout.ms</td>
+                    <td class="property" id="cluster-manager-container-request-timeout-ms">cluster-manager.container.<br>request.timeout.ms</td>
                     <td class="default">5000</td>
                     <td class="description">
                         The allocator thread periodically checks the state of the container requests and allocated containers to determine the assignment of a container to an allocated resource.
@@ -1758,10 +1870,9 @@
                         <a href="../container/metrics.html">JSON encoding</a> for metrics; in order to use this
                         encoding, you also need to configure a serde for the metrics stream:
                         <ul>
-                            <li><a href="#systems-samza-msg-serde" class="property">systems.*.streams.*.samza.msg.serde</a>
-                                <code>= metrics-serde</code> (replacing the asterisks with the
-                                <span class="system">system-name</span> and <span class="stream">stream-name</span>
-                                of the metrics stream)</li>
+                            <li><a href="#systems-samza-msg-serde" class="property">streams.*.samza.msg.serde</a>
+                                <code>= metrics-serde</code> (replacing the asterisk with the
+                                <span class="stream">stream-name</span> of the metrics stream)</li>
                             <li><a href="#serializers-registry-class" class="property">serializers.registry.metrics-serde.class</a>
                                 <code>= org.apache.samza.serializers.MetricsSnapshotSerdeFactory</code>
                                 (registering the serde under a <span class="serde">serde-name</span> of
@@ -1789,37 +1900,37 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="hdfs-writer-class">systems.*.producer.hdfs.writer.class</td>
-                    <td class="default">org.apache.samza.system.hdfs.writer.BinarySequenceFileHdfsWriter</td>
+                    <td class="property" id="hdfs-writer-class">systems.<span class="system">system-name</span>.<br>.producer.hdfs.writer.class</td>
+                    <td class="default">org.apache.samza.system.hdfs.<br>writer.BinarySequenceFileHdfsWriter</td>
                     <td class="description">Fully-qualified class name of the HdfsWriter implementation this HDFS Producer system should use</td>
                 </tr>
                 <tr>
-                  <td class="property" id="hdfs-compression-type">systems.*.producer.hdfs.compression.type</td>
+                  <td class="property" id="hdfs-compression-type">systems.<span class="system">system-name</span>.<br>.producer.hdfs.compression.type</td>
                     <td class="default">none</td>
                     <td class="description">A human-readable label for the compression type to use, such as "gzip" "snappy" etc. This label will be interpreted differently (or ignored) depending on the nature of the HdfsWriter implementation.</td>
                 </tr>
                 <tr>
-                    <td class="property" id="hdfs-base-output-dir">systems.*.producer.hdfs.base.output.dir</td>
+                    <td class="property" id="hdfs-base-output-dir">systems.<span class="system">system-name</span>.<br>.producer.hdfs.base.output.dir</td>
                     <td class="default">/user/USERNAME/SYSTEMNAME</td>
                     <td class="description">The base output directory for HDFS writes. Defaults to the home directory of the user who ran the job, followed by the systemName for this HdfsSystemProducer as defined in the job.properties file.</td>
                 </tr>
                 <tr>
-                    <td class="property" id="hdfs-bucketer-class">systems.*.producer.hdfs.bucketer.class</td>
-                    <td class="default">org.apache.samza.system.hdfs.writer.JobNameDateTimeBucketer</td>
+                    <td class="property" id="hdfs-bucketer-class">systems.<span class="system">system-name</span>.<br>.producer.hdfs.bucketer.class</td>
+                    <td class="default">org.apache.samza.system.hdfs.<br>writer.JobNameDateTimeBucketer</td>
                     <td class="description">Fully-qualified class name of the Bucketer implementation that will manage HDFS paths and file names. Used to batch writes by time, or other similar partitioning methods.</td>
                 </tr>
                 <tr>
-                    <td class="property" id="hdfs-bucketer-date-path-format">systems.*.producer.hdfs.bucketer.date.path.format</td>
+                    <td class="property" id="hdfs-bucketer-date-path-format">systems.<span class="system">system-name</span>.<br>.producer.hdfs.bucketer.date.path.format</td>
                     <td class="default"yyyy_MM_dd></td>
                     <td class="description">A date format (using Java's SimpleDataFormat syntax) appropriate for use in an HDFS Path, which can configure time-based bucketing of output files.</td>
                 </tr>
                 <tr>
-                  <td class="property" id="hdfs-write-batch-size-bytes">systems.*.producer.hdfs.write.batch.size.bytes</td>
+                  <td class="property" id="hdfs-write-batch-size-bytes">systems.<span class="system">system-name</span>.<br>.producer.hdfs.write.batch.size.bytes</td>
                   <td class="default">268435456</td>
                   <td class="description">The number of bytes of outgoing messages to write to each HDFS output file before cutting a new file. Defaults to 256MB if not set.</td>
                 </tr>
                 <tr>
-                    <td class="property" id="hdfs-write-batch-size-records">systems.*.producer.hdfs.write.batch.size.records</td>
+                    <td class="property" id="hdfs-write-batch-size-records">systems.<span class="system">system-name</span>.<br>.producer.hdfs.write.batch.size.records</td>
                     <td class="default">262144</td>
                     <td class="description">The number of outgoing messages to write to each HDFS output file before cutting a new file. Defaults to 262144 if not set.</td>
                 </tr>
@@ -1829,37 +1940,37 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="hdfs-consumer-buffer-capacity">systems.*.consumer.bufferCapacity</td>
+                    <td class="property" id="hdfs-consumer-buffer-capacity">systems.<span class="system">system-name</span>.<br>.consumer.bufferCapacity</td>
                     <td class="default">10</td>
                     <td class="description">Capacity of the hdfs consumer buffer - the blocking queue used for storing messages. Larger buffer capacity typically leads to better throughput but consumes more memory.</td>
                 </tr>
                 <tr>
-                    <td class="property" id="hdfs-consumer-numMaxRetries">systems.*.consumer.numMaxRetries</td>
+                    <td class="property" id="hdfs-consumer-numMaxRetries">systems.<span class="system">system-name</span>.<br>.consumer.numMaxRetries</td>
                     <td class="default">10</td>
                     <td class="description">The number of retry attempts when there is a failure to fetch messages from HDFS, before the container fails.</td>
                 </tr>
                 <tr>
-                    <td class="property" id="hdfs-partitioner-whitelist">systems.*.partitioner.defaultPartitioner.whitelist</td>
+                    <td class="property" id="hdfs-partitioner-whitelist">systems.<span class="system">system-name</span>.<br>.partitioner.defaultPartitioner.whitelist</td>
                     <td class="default">.*</td>
                     <td class="description">White list used by directory partitioner to select files in a hdfs directory, in Java Pattern style.</td>
                 </tr>
                 <tr>
-                    <td class="property" id="hdfs-partitioner-blacklist">systems.*.partitioner.defaultPartitioner.blacklist</td>
+                    <td class="property" id="hdfs-partitioner-blacklist">systems.<span class="system">system-name</span>.<br>.partitioner.defaultPartitioner.blacklist</td>
                     <td class="default"></td>
                     <td class="description">Black list used by directory partitioner to filter out unwanted files in a hdfs directory, in Java Pattern style.</td>
                 </tr>
                 <tr>
-                    <td class="property" id="hdfs-partitioner-group-pattern">systems.*.partitioner.defaultPartitioner.groupPattern</td>
+                    <td class="property" id="hdfs-partitioner-group-pattern">systems.<span class="system">system-name</span>.<br>.partitioner.defaultPartitioner.groupPattern</td>
                     <td class="default"></td>
                     <td class="description">Group pattern used by directory partitioner for advanced partitioning. The advanced partitioning goes beyond the basic assumption that each file is a partition. With advanced partitioning you can group files into partitions arbitrarily. For example, if you have a set of files as [part-01-a.avro, part-01-b.avro, part-02-a.avro, part-02-b.avro, part-03-a.avro], and you want to organize the partitions as (part-01-a.avro, part-01-b.avro), (part-02-a.avro, part-02-b.avro), (part-03-a.avro), where the numbers in the middle act as a "group identifier", you can then set this property to be "part-[id]-.*" (note that "[id]" is a reserved term here, i.e. you have to literally put it as "[id]"). The partitioner will apply this pattern to all file names and extract the "group identifier" ("[id]" in the pattern), then use the "group identifier" to group files into partitions. See more details in <a href="https://issues.apache.org/jira/secure/attachment/
 12827670/HDFSSystemConsumer.pdf">HdfsSystemConsumer design doc</a> </td>
                 </tr>
                 <tr>
-                    <td class="property" id="hdfs-consumer-reader-type">systems.*.consumer.reader</td>
+                    <td class="property" id="hdfs-consumer-reader-type">systems.<span class="system">system-name</span>.<br>.consumer.reader</td>
                     <td class="default">avro</td>
                     <td class="description">Type of the file reader for different event formats (avro, plain, json, etc.). "avro" is only type supported for now.</td>
                 </tr>
                 <tr>
-                    <td class="property" id="hdfs-staging-directory">systems.*.stagingDirectory</td>
+                    <td class="property" id="hdfs-staging-directory">systems.<span class="system">system-name</span>.<br>.stagingDirectory</td>
                     <td class="default"></td>
                     <td class="description">Staging directory for storing partition description. By default (if not set by users) the value is inherited from "yarn.job.staging.directory" internally. The default value is typically good enough unless you want explicitly use a separate location.</td>
                 </tr>

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/docs/learn/documentation/versioned/jobs/logging.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/logging.md b/docs/learn/documentation/versioned/jobs/logging.md
index 6d65984..44eeb3c 100644
--- a/docs/learn/documentation/versioned/jobs/logging.md
+++ b/docs/learn/documentation/versioned/jobs/logging.md
@@ -139,6 +139,11 @@ to log4j.xml and define the system name by specifying the config:
 task.log4j.system="<system-name>"
 {% endhighlight %}
 
+The default stream name for logger is generated using the following convention, though you can override it using the `StreamName` property in the log4j.xml as shown above.
+```java
+"__samza_%s_%s_logs" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
+```
+
 Configuring the StreamAppender will automatically encode messages using logstash's [Log4J JSON format](https://github.com/logstash/log4j-jsonevent-layout). Samza also supports pluggable serialization for those that prefer non-JSON logging events. This can be configured the same way other stream serializers are defined:
 
 {% highlight jproperties %}

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
index ad37eb3..8444c91 100644
--- a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
+++ b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.system;
 
+import java.lang.reflect.Constructor;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.operators.StreamGraphBuilder;
@@ -26,6 +27,9 @@ import org.apache.samza.config.Config;
 
 /**
  * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph}
+ *
+ * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
+ * to support the {@link ExecutionEnvironment#fromConfig(Config)} static constructor.
  */
 @InterfaceStability.Unstable
 public interface ExecutionEnvironment {
@@ -46,13 +50,17 @@ public interface ExecutionEnvironment {
   /**
    * Static method to load the non-standalone environment.
    *
+   * Requires the implementation class to define a constructor with a single {@link Config} as the argument.
+   *
    * @param config  configuration passed in to initialize the Samza processes
    * @return  the configure-driven {@link ExecutionEnvironment} to run the user-defined stream applications
    */
   static ExecutionEnvironment fromConfig(Config config) {
     try {
-      if (ExecutionEnvironment.class.isAssignableFrom(Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS)))) {
-        return (ExecutionEnvironment) Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS)).newInstance();
+      Class<?> environmentClass = Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS));
+      if (ExecutionEnvironment.class.isAssignableFrom(environmentClass)) {
+        Constructor<?> constructor = environmentClass.getConstructor(Config.class); // *sigh*
+        return (ExecutionEnvironment) constructor.newInstance(config);
       }
     } catch (Exception e) {
       throw new ConfigException(String.format("Problem in loading ExecutionEnvironment class %s", config.get(ENVIRONMENT_CONFIG)), e);
@@ -70,4 +78,24 @@ public interface ExecutionEnvironment {
    */
   void run(StreamGraphBuilder graphBuilder, Config config);
 
+  /**
+   * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
+   *
+   * The stream configurations are read from the following properties in the config:
+   * {@code streams.{$streamId}.*}
+   * <br>
+   * All properties matching this pattern are assumed to be system-specific with two exceptions. The following two
+   * properties are Samza properties which are used to bind the stream to a system and a physical resource on that system.
+   *
+   * <ul>
+   *   <li>samza.system -         The name of the System on which this stream will be used. If this property isn't defined
+   *                              the stream will be associated with the System defined in {@code job.default.system}</li>
+   *   <li>samza.physical.name -  The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
+   *                              If this property isn't defined the physical.name will be set to the streamId</li>
+   * </ul>
+   *
+   * @param streamId  The logical identifier for the stream in Samza.
+   * @return          The {@link StreamSpec} instance.
+   */
+  StreamSpec streamFromConfig(String streamId);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
index e953d46..5711a8b 100644
--- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
@@ -137,13 +137,8 @@ public class StreamSpec {
    * @param config          A map of properties for the stream. These may be System-specfic.
    */
   public StreamSpec(String id, String physicalName, String systemName, int partitionCount,  Map<String, String> config) {
-    if (id == null) {
-      throw new NullPointerException("Parameter 'id' must not be null");
-    }
-
-    if (systemName == null) {
-      throw new NullPointerException("Parameter 'systemName' must not be null");
-    }
+    validateLogicalIdentifier("id", id);
+    validateLogicalIdentifier("systemName", systemName);
 
     if (partitionCount < 1) {
       throw new IllegalArgumentException("Parameter 'partitionCount' must be greater than 0");
@@ -200,4 +195,10 @@ public class StreamSpec {
   public String getOrDefault(String propertyName, String defaultValue) {
     return config.getOrDefault(propertyName, defaultValue);
   }
+
+  private void validateLogicalIdentifier(String identifierName, String identifierValue) {
+    if (!identifierValue.matches("[A-Za-z0-9_-]+")) {
+      throw new IllegalArgumentException(String.format("Identifier '%s' is '%s'. It must match the expression [A-Za-z0-9_-]+", identifierName, identifierValue));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
new file mode 100644
index 0000000..64d60b7
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.StreamConfig;
+
+
+public abstract class AbstractExecutionEnvironment implements ExecutionEnvironment {
+
+  private final Config config;
+
+  public AbstractExecutionEnvironment(Config config) {
+    if (config == null) {
+      throw new NullPointerException("Parameter 'config' cannot be null.");
+    }
+
+    this.config = config;
+  }
+
+  @Override
+  public StreamSpec streamFromConfig(String streamId) {
+    StreamConfig streamConfig = new StreamConfig(config);
+    String physicalName = streamConfig.getPhysicalName(streamId, streamId);
+
+    return streamFromConfig(streamId, physicalName);
+  }
+
+  /**
+   * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
+   *
+   * The stream configurations are read from the following properties in the config:
+   * {@code streams.{$streamId}.*}
+   * <br>
+   * All properties matching this pattern are assumed to be system-specific with one exception. The following
+   * property is a Samza property which is used to bind the stream to a system.
+   *
+   * <ul>
+   *   <li>samza.system - The name of the System on which this stream will be used. If this property isn't defined
+   *                      the stream will be associated with the System defined in {@code job.default.system}</li>
+   * </ul>
+   *
+   * @param streamId      The logical identifier for the stream in Samza.
+   * @param physicalName  The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
+   * @return              The {@link StreamSpec} instance.
+   */
+  /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName) {
+    StreamConfig streamConfig = new StreamConfig(config);
+    String system = streamConfig.getSystem(streamId);
+
+    return streamFromConfig(streamId, physicalName, system);
+  }
+
+  /**
+   * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
+   *
+   * The stream configurations are read from the following properties in the config:
+   * {@code streams.{$streamId}.*}
+   *
+   * @param streamId      The logical identifier for the stream in Samza.
+   * @param physicalName  The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
+   * @param system        The name of the System on which this stream will be used.
+   * @return              The {@link StreamSpec} instance.
+   */
+  /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName, String system) {
+    StreamConfig streamConfig = new StreamConfig(config);
+    Map<String, String> properties = streamConfig.getStreamProperties(streamId);
+
+    return new StreamSpec(streamId, physicalName, system, properties);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
index fafa2cb..e592e66 100644
--- a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
+++ b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
@@ -24,7 +24,11 @@ import org.apache.samza.config.Config;
 /**
  * This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment
  */
-public class RemoteExecutionEnvironment implements ExecutionEnvironment {
+public class RemoteExecutionEnvironment extends AbstractExecutionEnvironment {
+
+  public RemoteExecutionEnvironment(Config config) {
+    super(config);
+  }
 
   @Override public void run(StreamGraphBuilder app, Config config) {
     // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph}

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
index f0f6ef2..71d60ef 100644
--- a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
+++ b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
@@ -28,7 +28,11 @@ import org.apache.samza.operators.StreamGraphImpl;
 /**
  * This class implements the {@link ExecutionEnvironment} that runs the applications in standalone environment
  */
-public class StandaloneExecutionEnvironment implements ExecutionEnvironment {
+public class StandaloneExecutionEnvironment extends AbstractExecutionEnvironment {
+
+  public StandaloneExecutionEnvironment(Config config) {
+    super(config);
+  }
 
   // TODO: may want to move this to a common base class for all {@link ExecutionEnvironment}
   StreamGraph createGraph(StreamGraphBuilder app, Config config) {

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index b64e406..9d6cbc2 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -43,6 +43,7 @@ object JobConfig {
   val SAMZA_FWK_PATH = "samza.fwk.path"
   val SAMZA_FWK_VERSION = "samza.fwk.version"
   val JOB_COORDINATOR_SYSTEM = "job.coordinator.system"
+  val JOB_DEFAULT_SYSTEM = "job.default.system"
   val JOB_CONTAINER_COUNT = "job.container.count"
   val jOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size"
   val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode"
@@ -104,6 +105,8 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   def getCoordinatorSystemName = getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse(
       throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution."))
 
+  def getDefaultSystem = getOption(JobConfig.JOB_DEFAULT_SYSTEM)
+
   def getContainerCount = {
     getOption(JobConfig.JOB_CONTAINER_COUNT) match {
       case Some(count) => count.toInt

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index 0ccc7df..6a3ed4b 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -19,64 +19,189 @@
 
 package org.apache.samza.config
 
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.system.SystemStream
 import org.apache.samza.util.Logging
+
 import scala.collection.JavaConversions._
-import org.apache.samza.system.SystemStream
 
 object StreamConfig {
-  // stream config constants
+  // Samza configs for streams
+  val SAMZA_PROPERTY = "samza."
+  val SYSTEM =                  SAMZA_PROPERTY + "system"
+  val PHYSICAL_NAME =           SAMZA_PROPERTY + "physical.name"
+  val MSG_SERDE =               SAMZA_PROPERTY + "msg.serde"
+  val KEY_SERDE =               SAMZA_PROPERTY + "key.serde"
+  val CONSUMER_RESET_OFFSET =   SAMZA_PROPERTY + "reset.offset"
+  val CONSUMER_OFFSET_DEFAULT = SAMZA_PROPERTY + "offset.default"
+
+  val STREAMS_PREFIX = "streams."
+  val STREAM_ID_PREFIX = STREAMS_PREFIX + "%s."
+  val SYSTEM_FOR_STREAM_ID = STREAM_ID_PREFIX + SYSTEM
+  val PHYSICAL_NAME_FOR_STREAM_ID = STREAM_ID_PREFIX + PHYSICAL_NAME
+  val SAMZA_STREAM_PROPERTY_PREFIX = STREAM_ID_PREFIX + SAMZA_PROPERTY
+
   val STREAM_PREFIX = "systems.%s.streams.%s."
-  val MSG_SERDE = STREAM_PREFIX + "samza.msg.serde"
-  val KEY_SERDE = STREAM_PREFIX + "samza.key.serde"
-  val CONSUMER_RESET_OFFSET = STREAM_PREFIX + "samza.reset.offset"
-  val CONSUMER_OFFSET_DEFAULT = STREAM_PREFIX + "samza.offset.default"
 
   implicit def Config2Stream(config: Config) = new StreamConfig(config)
 }
 
 class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
-  def getStreamMsgSerde(systemStream: SystemStream) =
-    getNonEmptyOption(StreamConfig.MSG_SERDE format (systemStream.getSystem, systemStream.getStream))
-
-  def getStreamKeySerde(systemStream: SystemStream) =
-    getNonEmptyOption(StreamConfig.KEY_SERDE format (systemStream.getSystem, systemStream.getStream))
+  def getStreamMsgSerde(systemStream: SystemStream) = nonEmptyOption(getProperty(systemStream, StreamConfig.MSG_SERDE))
 
-  def getResetOffsetMap(systemName: String) = {
-    val subConf = config.subset("systems.%s.streams." format systemName, true)
-    subConf
-      .filterKeys(k => k.endsWith(".samza.reset.offset"))
-      .map(kv => {
-        val streamName = kv._1.replace(".samza.reset.offset", "")
-        val systemStream = new SystemStream(systemName, streamName)
-        val resetVal = getResetOffset(systemStream)
-        (systemStream, resetVal)
-      }).toMap
-  }
+  def getStreamKeySerde(systemStream: SystemStream) = nonEmptyOption(getProperty(systemStream, StreamConfig.KEY_SERDE))
 
   def getResetOffset(systemStream: SystemStream) =
-    getOption(StreamConfig.CONSUMER_RESET_OFFSET format (systemStream.getSystem, systemStream.getStream)) match {
+    Option(getProperty(systemStream, StreamConfig.CONSUMER_RESET_OFFSET)) match {
       case Some("true") => true
       case Some("false") => false
       case Some(resetOffset) =>
-        warn("Got a configuration for %s that is not true, or false (was %s). Defaulting to false." format (StreamConfig.CONSUMER_RESET_OFFSET format (systemStream.getSystem, systemStream.getStream), resetOffset))
+        warn("Got a .samza.reset.offset configuration for SystemStream %s that is not true, or false (was %s). Defaulting to false."
+          format (systemStream.toString format (systemStream.getSystem, systemStream.getStream), resetOffset))
         false
       case _ => false
     }
 
   def getDefaultStreamOffset(systemStream: SystemStream) =
-    getOption(StreamConfig.CONSUMER_OFFSET_DEFAULT format (systemStream.getSystem, systemStream.getStream))
+    Option(getProperty(systemStream, StreamConfig.CONSUMER_OFFSET_DEFAULT))
 
   /**
    * Returns a list of all SystemStreams that have a serde defined from the config file.
    */
   def getSerdeStreams(systemName: String) = {
     val subConf = config.subset("systems.%s.streams." format systemName, true)
-    subConf
+    val legacySystemStreams = subConf
       .keys
-      .filter(k => k.endsWith(".samza.msg.serde") || k.endsWith(".samza.key.serde"))
+      .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE))
       .map(k => {
         val streamName = k.substring(0, k.length - 16 /* .samza.XXX.serde length */ )
         new SystemStream(systemName, streamName)
       }).toSet
+
+    val systemStreams = subset(StreamConfig.STREAMS_PREFIX)
+      .keys
+      .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE))
+      .map(k => {
+        val streamId = k.substring(0, k.length - 16 /* .samza.XXX.serde length */ )
+        streamIdToSystemStream(streamId)
+      }).toSet
+
+    legacySystemStreams.union(systemStreams)
+  }
+
+  /**
+    * Gets the stream properties from the legacy config style:
+    * systems.{system}.streams.{streams}.*
+    *
+    * @param systemName the system name under which the properties are configured
+    * @param streamName the stream name
+    * @return           the map of properties for the stream
+    */
+  private def getSystemStreamProperties(systemName: String, streamName: String) = {
+    if (systemName == null || streamName == null) {
+      Map()
+    }
+    config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true)
+  }
+
+  /**
+    * Gets the properties for the specified streamId from the config.
+    * It first applies any legacy configs from this config location:
+    * systems.{system}.streams.{stream}.*
+    *
+    * It then overrides them with properties of the new config format:
+    * streams.{streamId}.*
+    *
+    * @param streamId the identifier for the stream in the config.
+    * @return         the merged map of config properties from both the legacy and new config styles
+    */
+  def getStreamProperties(streamId: String) = {
+    val allProperties = subset(StreamConfig.STREAM_ID_PREFIX format streamId)
+    val samzaProperties = allProperties.subset(StreamConfig.SAMZA_PROPERTY, false)
+    val filteredStreamProperties:java.util.Map[String, String] = allProperties.filterKeys(k => !samzaProperties.containsKey(k))
+    val inheritedLegacyProperties:java.util.Map[String, String] = getSystemStreamProperties(getSystem(streamId), getPhysicalName(streamId, streamId))
+    new MapConfig(java.util.Arrays.asList(inheritedLegacyProperties, filteredStreamProperties))
+  }
+
+  /**
+    * Gets the System associated with the specified streamId.
+    * It first looks for the property
+    * streams.{streamId}.system
+    *
+    * If no value was provided, it uses
+    * job.default.system
+    *
+    * @param streamId the identifier for the stream in the config.
+    * @return         the system name associated with the stream.
+    */
+  def getSystem(streamId: String) = {
+    getOption(StreamConfig.SYSTEM_FOR_STREAM_ID format streamId) match {
+      case Some(system) => system
+      case _ => config.getDefaultSystem.orNull
+    }
+  }
+
+  /**
+    * Gets the physical name for the specified streamId.
+    *
+    * @param streamId             the identifier for the stream in the config.
+    * @param defaultPhysicalName  the default to use if the physical name is missing.
+    * @return                     the physical identifier for the stream or the default if it is undefined.
+    */
+  def getPhysicalName(streamId: String, defaultPhysicalName: String) = {
+    get(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID format streamId, defaultPhysicalName)
+  }
+
+  /**
+    * Gets the specified property for a SystemStream.
+    *
+    * Note, because the translation is not perfect between SystemStream and streamId,
+    * this method is not identical to getProperty(streamId, property)
+    */
+  private def getProperty(systemStream: SystemStream, property: String): String = {
+    val streamVal = getStreamProperties(systemStreamToStreamId(systemStream)).get(property)
+
+    if (streamVal != null) {
+      streamVal
+    } else {
+      getSystemStreamProperties(systemStream.getSystem(), systemStream.getStream).get(property)
+    }
+  }
+
+  private def getStreamIds(): Iterable[String] = {
+    subset(StreamConfig.STREAMS_PREFIX).keys
+  }
+
+  private def getStreamIdsForSystem(system: String): Iterable[String] = {
+    getStreamIds().filter(streamId => system.equals(getSystem(streamId)))
+  }
+
+  private def systemStreamToStreamId(systemStream: SystemStream): String = {
+   val streamIds = getStreamIdsForSystem(systemStream.getSystem).filter(streamId => systemStream.getStream().equals(getPhysicalName(streamId, streamId)))
+    if (streamIds.size > 1) {
+      throw new IllegalStateException("There was more than one stream found for system stream %s" format(systemStream))
+    }
+
+    if (streamIds.isEmpty) {
+      null
+    } else {
+      streamIds.head
+    }
+  }
+
+  /**
+    * A streamId is translated to a SystemStream by looking up its System and physicalName. It
+    * will use the streamId as the stream name if the physicalName doesn't exist.
+    */
+  private def streamIdToSystemStream(streamId: String): SystemStream = {
+    new SystemStream(getSystem(streamId), getPhysicalName(streamId, streamId))
+  }
+
+  private def nonEmptyOption(value: String): Option[String] = {
+    if (value == null || value.isEmpty) {
+      None
+    } else {
+      Some(value)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
index 059afce..d988270 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
@@ -19,6 +19,9 @@
 
 package org.apache.samza.example;
 
+import java.time.Duration;
+import java.util.Set;
+import java.util.function.BiFunction;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
@@ -30,10 +33,6 @@ import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStreamPartition;
 
-import java.time.Duration;
-import java.util.function.BiFunction;
-import java.util.Set;
-
 
 /**
  * Example implementation of split stream tasks
@@ -69,7 +68,7 @@ public class TestBroadcastExample extends TestExampleBase {
     BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1;
     inputs.keySet().forEach(entry -> {
         MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(
-                new StreamSpec(entry.toString(), entry.getStream(), entry.getSystem()), null, null).map(this::getInputMessage);
+                new StreamSpec(entry.getSystem() + "-" + entry.getStream(), entry.getStream(), entry.getSystem()), null, null).map(this::getInputMessage);
 
         inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
             .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
index cc53814..f956972 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
@@ -64,7 +64,7 @@ public class TestJoinExample  extends TestExampleBase {
   public void init(StreamGraph graph, Config config) {
 
     for (SystemStream input : inputs.keySet()) {
-      StreamSpec inputStreamSpec = new StreamSpec(input.toString(), input.getStream(), input.getSystem());
+      StreamSpec inputStreamSpec = new StreamSpec(input.getSystem() + "-" + input.getStream(), input.getStream(), input.getSystem());
       MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream(
           inputStreamSpec, null, null).map(this::getInputMessage);
       if (joinOutput == null) {

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
index 73f4674..6896da5 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
@@ -59,7 +59,7 @@ public class TestWindowExample extends TestExampleBase {
   public void init(StreamGraph graph, Config config) {
     BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1;
     inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(
-            new StreamSpec(source.toString(), source.getStream(), source.getSystem()), null, null).
+            new StreamSpec(source.getSystem() + "-" + source.getStream(), source.getStream(), source.getSystem()), null, null).
         map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(),
             m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator)));
 

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java b/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java
new file mode 100644
index 0000000..861f049
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+public class TestAbstractExecutionEnvironment {
+  private static final String STREAM_ID = "t3st-Stream_Id";
+  private static final String STREAM_ID_INVALID = "test#Str3amId!";
+
+  private static final String TEST_PHYSICAL_NAME = "t3st-Physical_Name";
+  private static final String TEST_PHYSICAL_NAME2 = "testPhysicalName2";
+  private static final String TEST_PHYSICAL_NAME_SPECIAL_CHARS = "test://Physical.Name?";
+
+  private static final String TEST_SYSTEM = "t3st-System_Name";
+  private static final String TEST_SYSTEM2 = "testSystemName2";
+  private static final String TEST_SYSTEM_INVALID = "test:System!Name@";
+
+  private static final String TEST_DEFAULT_SYSTEM = "testDefaultSystemName";
+
+
+  @Test(expected = NullPointerException.class)
+  public void testConfigValidation() {
+    new TestAbstractExecutionEnvironmentImpl(null);
+  }
+
+  // The physical name should be pulled from the StreamConfig.PHYSICAL_NAME property value.
+  @Test
+  public void testStreamFromConfigWithPhysicalNameInConfig() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
+  }
+
+  // The streamId should be used as the physicalName when the physical name is not specified.
+  // NOTE: its either this, set to null, or exception. This seems better for backward compatibility and API brevity.
+  @Test
+  public void testStreamFromConfigWithoutPhysicalNameInConfig() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(STREAM_ID, spec.getPhysicalName());
+  }
+
+  // If the system is specified at the stream scope, use it
+  @Test
+  public void testStreamFromConfigWithSystemAtStreamScopeInConfig() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // If system isn't specified at stream scope, use the default system
+  @Test
+  public void testStreamFromConfigWithSystemAtDefaultScopeInConfig() {
+    Config config = addConfigs(buildStreamConfig(STREAM_ID,
+                                                  StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME),
+                                JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(TEST_DEFAULT_SYSTEM, spec.getSystemName());
+  }
+
+  // Stream scope should override default scope
+  @Test
+  public void testStreamFromConfigWithSystemAtBothScopesInConfig() {
+    Config config = addConfigs(buildStreamConfig(STREAM_ID,
+                                                StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+                                                StreamConfig.SYSTEM(), TEST_SYSTEM),
+                                JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // System is required. Throw if it cannot be determined.
+  @Test(expected = Exception.class)
+  public void testStreamFromConfigWithOutSystemInConfig() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // The properties in the config "streams.{streamId}.*" should be passed through to the spec.
+  @Test
+  public void testStreamFromConfigPropertiesPassthrough() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                    StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+                                    StreamConfig.SYSTEM(), TEST_SYSTEM,
+                                    "systemProperty1", "systemValue1",
+                                    "systemProperty2", "systemValue2",
+                                    "systemProperty3", "systemValue3");
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    Map<String, String> properties = spec.getConfig();
+    assertEquals(3, properties.size());
+    assertEquals("systemValue1", properties.get("systemProperty1"));
+    assertEquals("systemValue2", properties.get("systemProperty2"));
+    assertEquals("systemValue3", properties.get("systemProperty3"));
+    assertEquals("systemValue1", spec.get("systemProperty1"));
+    assertEquals("systemValue2", spec.get("systemProperty2"));
+    assertEquals("systemValue3", spec.get("systemProperty3"));
+  }
+
+  // The samza properties (which are invalid for the underlying system) should be filtered out.
+  @Test
+  public void testStreamFromConfigSamzaPropertiesOmitted() {
+    Config config = buildStreamConfig(STREAM_ID,
+                              StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+                                    StreamConfig.SYSTEM(), TEST_SYSTEM,
+                                    "systemProperty1", "systemValue1",
+                                    "systemProperty2", "systemValue2",
+                                    "systemProperty3", "systemValue3");
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    Map<String, String> properties = spec.getConfig();
+    assertEquals(3, properties.size());
+    assertNull(properties.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID)));
+    assertNull(properties.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID)));
+    assertNull(spec.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID)));
+    assertNull(spec.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID)));
+  }
+
+  // When the physicalName argument is passed explicitly it should be used, regardless of whether it is also in the config
+  @Test
+  public void testStreamFromConfigPhysicalNameArgSimple() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME);
+
+    assertEquals(STREAM_ID, spec.getId());
+    assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // Special characters are allowed for the physical name
+  @Test
+  public void testStreamFromConfigPhysicalNameArgSpecialCharacters() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME_SPECIAL_CHARS);
+    assertEquals(TEST_PHYSICAL_NAME_SPECIAL_CHARS, spec.getPhysicalName());
+  }
+
+  // Null is allowed for the physical name
+  @Test
+  public void testStreamFromConfigPhysicalNameArgNull() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID, null);
+    assertNull(spec.getPhysicalName());
+  }
+
+  // When the system name is provided explicitly, it should be used, regardless of whether it's also in the config
+  @Test
+  public void testStreamFromConfigSystemNameArgValid() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM2);              // This too
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM);
+
+    assertEquals(STREAM_ID, spec.getId());
+    assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // Special characters are NOT allowed for system name, because it's used as an identifier in the config.
+  @Test(expected = IllegalArgumentException.class)
+  public void testStreamFromConfigSystemNameArgInvalid() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM2);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM_INVALID);
+  }
+
+  // Empty strings are NOT allowed for system name, because it's used as an identifier in the config.
+  @Test(expected = IllegalArgumentException.class)
+  public void testStreamFromConfigSystemNameArgEmpty() {
+    Config config = buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
+        StreamConfig.SYSTEM(), TEST_SYSTEM2);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, "");
+  }
+
+  // Null is not allowed for system name.
+  @Test(expected = NullPointerException.class)
+  public void testStreamFromConfigSystemNameArgNull() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM2);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, null);
+  }
+
+  // Special characters are NOT allowed for streamId, because it's used as an identifier in the config.
+  @Test(expected = IllegalArgumentException.class)
+  public void testStreamFromConfigStreamIdInvalid() {
+    Config config = buildStreamConfig(STREAM_ID_INVALID,
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    env.streamFromConfig(STREAM_ID_INVALID);
+  }
+
+  // Empty strings are NOT allowed for streamId, because it's used as an identifier in the config.
+  @Test(expected = IllegalArgumentException.class)
+  public void testStreamFromConfigStreamIdEmpty() {
+    Config config = buildStreamConfig("",
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    env.streamFromConfig("");
+  }
+
+  // Null is not allowed for streamId.
+  @Test(expected = NullPointerException.class)
+  public void testStreamFromConfigStreamIdNull() {
+    Config config = buildStreamConfig(null,
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    env.streamFromConfig(null);
+  }
+
+
+  // Helper methods
+
+  private Config buildStreamConfig(String streamId, String... kvs) {
+    // inject streams.x. into each key
+    for (int i = 0; i < kvs.length - 1; i += 2) {
+      kvs[i] = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId) + kvs[i];
+    }
+    return buildConfig(kvs);
+  }
+
+  private Config buildConfig(String... kvs) {
+    if (kvs.length % 2 != 0) {
+      throw new IllegalArgumentException("There must be parity between the keys and values");
+    }
+
+    Map<String, String> configMap = new HashMap<>();
+    for (int i = 0; i < kvs.length - 1; i += 2) {
+      configMap.put(kvs[i], kvs[i + 1]);
+    }
+    return new MapConfig(configMap);
+  }
+
+  private Config addConfigs(Config original, String... kvs) {
+    Map<String, String> result = new HashMap<>();
+    result.putAll(original);
+    result.putAll(buildConfig(kvs));
+    return new MapConfig(result);
+  }
+
+  private class TestAbstractExecutionEnvironmentImpl extends AbstractExecutionEnvironment {
+
+    public TestAbstractExecutionEnvironmentImpl(Config config) {
+      super(config);
+    }
+
+    @Override
+    public void run(StreamGraphBuilder graphBuilder, Config config) {
+      // do nothing
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index e355e7e..7e9f18a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -36,6 +36,7 @@ import java.util
 import scala.collection.JavaConverters._
 import org.apache.samza.system.kafka.KafkaSystemFactory
 import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.kafka.common.serialization.ByteArraySerializer
 
 object KafkaConfig {
@@ -165,13 +166,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     kafkaChangeLogProperties
   }
 
-  def getTopicKafkaProperties(systemName: String, streamName: String) = {
-    val filteredConfigs = config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true)
-    val topicProperties = new Properties
-    filteredConfigs.foreach { kv => topicProperties.setProperty(kv._1, kv._2) }
-    topicProperties
-  }
-
   // kafka config
   def getKafkaSystemConsumerConfig( systemName: String,
                                     clientId: String,

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
index 59015a9..0f0d792 100644
--- a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
+++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
@@ -19,6 +19,9 @@
 
 package org.apache.samza.config;
 
+import org.apache.samza.system.SystemStream;
+
+
 /**
  * This class contains the methods for getting properties that are needed by the
  * StreamAppender.
@@ -36,7 +39,7 @@ public class Log4jSystemConfig extends JavaSystemConfig {
    * Defines whether or not to include file location information for Log4J
    * appender messages. File location information includes the method, line
    * number, class, etc.
-   * 
+   *
    * @return If true, will include file location (method, line number, etc)
    *         information in Log4J appender messages.
    */
@@ -68,7 +71,7 @@ public class Log4jSystemConfig extends JavaSystemConfig {
 
   /**
    * Get the class name according to the serde name.
-   * 
+   *
    * @param name serde name
    * @return serde factory name, or null if there is no factory defined for the
    *         supplied serde name.
@@ -78,7 +81,8 @@ public class Log4jSystemConfig extends JavaSystemConfig {
   }
 
   public String getStreamSerdeName(String systemName, String streamName) {
-    String streamSerdeNameConfig = String.format(StreamConfig.MSG_SERDE(), systemName, streamName);
-    return get(streamSerdeNameConfig, null);
+    StreamConfig streamConfig =  new StreamConfig(this);
+    scala.Option<String> option = streamConfig.getStreamMsgSerde(new SystemStream(systemName, streamName));
+    return option.isEmpty() ? null : option.get();
   }
 }


[4/9] samza git commit: SAMZA-1104; fix yarn security page link from index.html page

Posted by jm...@apache.org.
SAMZA-1104; fix yarn security page link from index.html page

Author: Chen Song <cs...@appnexus.com>

Reviewers: Jagadish <ja...@apache.org>

Closes #62 from garlicbulb-puzhuo/SAMZA-1104


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/098a11d7
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/098a11d7
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/098a11d7

Branch: refs/heads/samza-fluent-api-v1
Commit: 098a11d773590e6323fae21d534c3420f622ec55
Parents: 65df0b5
Author: Chen Song <cs...@appnexus.com>
Authored: Mon Feb 27 17:31:29 2017 -0800
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Mon Feb 27 17:31:29 2017 -0800

----------------------------------------------------------------------
 docs/learn/documentation/versioned/index.html | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/098a11d7/docs/learn/documentation/versioned/index.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/index.html b/docs/learn/documentation/versioned/index.html
index 6651b3b..e3cecc1 100644
--- a/docs/learn/documentation/versioned/index.html
+++ b/docs/learn/documentation/versioned/index.html
@@ -83,7 +83,7 @@ title: Documentation
   <li><a href="yarn/yarn-host-affinity.html">Host Affinity & Yarn</a></li>
   <li><a href="hdfs/producer.html">Writing to HDFS</a></li>
   <li><a href="hdfs/consumer.html">Reading from HDFS</a></li>
-  <li><a href="hdfs/yarn-security.html">Yarn Security</a></li>
+  <li><a href="yarn/yarn-security.html">Yarn Security</a></li>
 <!-- TODO write yarn pages
   <li><a href="">Fault Tolerance</a></li>
   <li><a href="">Security</a></li>


[2/9] samza git commit: SAMZA-1102: Zk controller

Posted by jm...@apache.org.
SAMZA-1102: Zk controller

SAMZA-1102: Added ZKController and ZkControllerImpl

Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Author: navina <na...@apache.org>

Reviewers: Navina Ramesh <na...@apache.org>, Fred Ji <fj...@apache.org>, Xinyu Liu <xi...@linkedin.com>

Closes #50 from sborya/ZkController


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f1bc1d0b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f1bc1d0b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f1bc1d0b

Branch: refs/heads/samza-fluent-api-v1
Commit: f1bc1d0b36242170930c0001c9efa7e5c24f8dd0
Parents: e6147fd
Author: Boris Shkolnik <bo...@apache.org>
Authored: Thu Feb 23 14:02:05 2017 -0800
Committer: navina <na...@apache.org>
Committed: Thu Feb 23 14:02:05 2017 -0800

----------------------------------------------------------------------
 .../processor/SamzaContainerController.java     |   1 +
 .../apache/samza/processor/StreamProcessor.java |  10 +-
 .../java/org/apache/samza/zk/ZkController.java  |  32 ++++
 .../org/apache/samza/zk/ZkControllerImpl.java   | 163 +++++++++++++++++++
 .../apache/samza/zk/ZkControllerListener.java   |  34 ++++
 .../java/org/apache/samza/zk/ZkKeyBuilder.java  |  22 ++-
 .../org/apache/samza/zk/ZkLeaderElector.java    |  36 ++--
 .../main/java/org/apache/samza/zk/ZkUtils.java  |  49 ++++++
 .../org/apache/samza/zk/TestZkKeyBuilder.java   |   4 +-
 .../apache/samza/zk/TestZkLeaderElector.java    | 152 ++++++++++++++---
 .../java/org/apache/samza/zk/TestZkUtils.java   | 105 ++++++++++--
 11 files changed, 549 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
index d448d30..76e2053 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
@@ -60,6 +60,7 @@ public class SamzaContainerController {
    * @param taskFactory         Factory that be used create instances of {@link org.apache.samza.task.StreamTask} or
    *                            {@link org.apache.samza.task.AsyncStreamTask}
    * @param containerShutdownMs How long the Samza container should wait for an orderly shutdown of task instances
+   * @param processorId         Id of the processor
    * @param metricsReporterMap  Map of metric reporter name and {@link MetricsReporter} instance
    */
   public SamzaContainerController(

http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 5e90c56..4d3e8ab 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -94,9 +94,14 @@ public class StreamProcessor {
     this(processorId, config, customMetricsReporters, (Object) asyncStreamTaskFactory);
   }
 
+
   /**
-   * Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created
+   *Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created
    * using the provided {@link StreamTaskFactory}.
+   * @param processorId - this processor Id
+   * @param config - config
+   * @param customMetricsReporters metric Reporter
+   * @param streamTaskFactory task factory to instantiate the Task
    */
   public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
                          StreamTaskFactory streamTaskFactory) {
@@ -106,6 +111,9 @@ public class StreamProcessor {
   /**
    * Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created
    * using the "task.class" configuration instead of a task factory.
+   * @param processorId - this processor Id
+   * @param config - config
+   * @param customMetricsReporters metrics
    */
   public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters) {
     this(processorId, config, customMetricsReporters, (Object) null);

http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkController.java b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
new file mode 100644
index 0000000..20c62cf
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.samza.zk;
+
+/**
+ * Api to the functionality provided by ZK
+ */
+public interface ZkController {
+  void register();
+  boolean isLeader();
+  void notifyJobModelChange(String version);
+  void stop();
+  void listenToProcessorLiveness();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
new file mode 100644
index 0000000..70c8a37
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.samza.SamzaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+
+public class ZkControllerImpl implements ZkController {
+  private static final Logger LOG = LoggerFactory.getLogger(ZkControllerImpl.class);
+
+  private final String processorIdStr;
+  private final ZkUtils zkUtils;
+  private final ZkControllerListener zkControllerListener;
+  private final ZkLeaderElector leaderElector;
+  private final ScheduleAfterDebounceTime debounceTimer;
+
+  public ZkControllerImpl(String processorIdStr, ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer,
+      ZkControllerListener zkControllerListener) {
+    this.processorIdStr = processorIdStr;
+    this.zkUtils = zkUtils;
+    this.zkControllerListener = zkControllerListener;
+    this.leaderElector = new ZkLeaderElector(processorIdStr, zkUtils,
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            onBecomeLeader();
+          }
+        }
+    );
+    this.debounceTimer = debounceTimer;
+
+    init();
+  }
+
+  private void init() {
+    ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
+    zkUtils.makeSurePersistentPathsExists(
+        new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder
+            .getJobModelPathPrefix()});
+  }
+
+  private void onBecomeLeader() {
+
+    listenToProcessorLiveness(); // subscribe for adding new processors
+
+    // inform the caller
+    zkControllerListener.onBecomeLeader();
+
+  }
+
+  @Override
+  public void register() {
+
+    // TODO - make a loop here with some number of attempts.
+    // possibly split into two method - becomeLeader() and becomeParticipant()
+    leaderElector.tryBecomeLeader();
+
+    // subscribe to JobModel version updates
+    zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(debounceTimer));
+  }
+
+  @Override
+  public boolean isLeader() {
+    return leaderElector.amILeader();
+  }
+
+  @Override
+  public void notifyJobModelChange(String version) {
+    zkControllerListener.onNewJobModelAvailable(version);
+  }
+
+  @Override
+  public void stop() {
+    if (isLeader()) {
+      leaderElector.resignLeadership();
+    }
+    zkUtils.close();
+  }
+
+  @Override
+  public void listenToProcessorLiveness() {
+    zkUtils.subscribeToProcessorChange(new ZkProcessorChangeHandler(debounceTimer));
+  }
+
+  // Only by Leader
+  class ZkProcessorChangeHandler  implements IZkChildListener {
+    private final ScheduleAfterDebounceTime debounceTimer;
+    public ZkProcessorChangeHandler(ScheduleAfterDebounceTime debounceTimer) {
+      this.debounceTimer = debounceTimer;
+    }
+    /**
+     * Called when the children of the given path changed.
+     *
+     * @param parentPath    The parent path
+     * @param currentChilds The children or null if the root node (parent path) was deleted.
+     * @throws Exception
+     */
+    @Override
+    public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+      LOG.info(
+          "ZkControllerImpl::ZkProcessorChangeHandler::handleChildChange - Path: " + parentPath + "  Current Children: "
+              + currentChilds);
+      debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
+          ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> zkControllerListener.onProcessorChange(currentChilds));
+    }
+  }
+
+  class ZkJobModelVersionChangeHandler implements IZkDataListener {
+    private final ScheduleAfterDebounceTime debounceTimer;
+    public ZkJobModelVersionChangeHandler(ScheduleAfterDebounceTime debounceTimer) {
+      this.debounceTimer = debounceTimer;
+    }
+    /**
+     * called when job model version gets updated
+     * @param dataPath
+     * @param data
+     * @throws Exception
+     */
+    @Override
+    public void handleDataChange(String dataPath, Object data) throws Exception {
+      LOG.info("pid=" + processorIdStr + ". Got notification on version update change. path=" + dataPath + "; data="
+          + (String) data);
+
+      debounceTimer
+          .scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () -> notifyJobModelChange((String) data));
+    }
+    @Override
+    public void handleDataDeleted(String dataPath) throws Exception {
+      throw new SamzaException("version update path has been deleted!");
+    }
+  }
+
+  public void shutdown() {
+    if (debounceTimer != null)
+      debounceTimer.stopScheduler();
+
+    if (zkUtils != null)
+      zkUtils.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java
new file mode 100644
index 0000000..f7fedd7
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import java.util.List;
+
+
+/**
+ * callbacks to the caller of the ZkController
+ */
+public interface ZkControllerListener {
+  void onBecomeLeader();
+  void onProcessorChange(List<String> processorIds);
+
+  void onNewJobModelAvailable(String version); // start job model update (stop current work)
+  void onNewJobModelConfirmed(String version); // start new work according to the new model
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
index 28344e9..d6cb9f3 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
@@ -19,8 +19,8 @@
 
 package org.apache.samza.zk;
 
-import com.google.common.base.Strings;
 import org.apache.samza.SamzaException;
+import com.google.common.base.Strings;
 
 /**
  * The following ZK hierarchy is maintained for Standalone jobs:
@@ -44,7 +44,7 @@ public class ZkKeyBuilder {
   private final String pathPrefix;
 
   static final String PROCESSORS_PATH = "processors";
-  static final String PROCESSOR_ID_PREFIX = "processor-";
+  public static final String JOBMODEL_VERSION_PATH = "jobModelVersion";
 
   public ZkKeyBuilder(String pathPrefix) {
     if (Strings.isNullOrEmpty(pathPrefix)) {
@@ -53,6 +53,10 @@ public class ZkKeyBuilder {
     this.pathPrefix = pathPrefix.trim();
   }
 
+  public String getRootPath() {
+    return "/" + pathPrefix;
+  }
+
   public String getProcessorsPath() {
     return String.format("/%s/%s", pathPrefix, PROCESSORS_PATH);
   }
@@ -71,4 +75,18 @@ public class ZkKeyBuilder {
       return path.substring(path.lastIndexOf("/") + 1);
     return null;
   }
+
+  public String getJobModelVersionPath() {
+    return String.format("/%s/%s", pathPrefix, JOBMODEL_VERSION_PATH);
+  }
+
+  public String getJobModelPathPrefix() {
+    return String.format("/%s/jobModels", pathPrefix);
+  }
+
+  public String getJobModelPath(String jobModelVersion) {
+    return String.format("%s/%s", getJobModelPathPrefix(), jobModelVersion);
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
index 8cdf8fc..b9bdf11 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
@@ -50,25 +50,30 @@ public class ZkLeaderElector implements LeaderElector {
   private final String hostName;
 
   private AtomicBoolean isLeader = new AtomicBoolean(false);
-  private final IZkDataListener zkLeaderElectionListener;
+  private final IZkDataListener previousProcessorChangeListener;
+  ZkLeaderElectorListener zkLeaderElectorListener;
   private String currentSubscription = null;
   private final Random random = new Random();
 
   @VisibleForTesting
-  ZkLeaderElector(String processorIdStr, ZkUtils zkUtils, IZkDataListener leaderElectionListener) {
+  ZkLeaderElector(String processorIdStr,
+      ZkUtils zkUtils,
+      ZkLeaderElectorListener zkLeaderElectorListener,
+      IZkDataListener previousProcessorChangeListener) {
     this.processorIdStr = processorIdStr;
     this.zkUtils = zkUtils;
-    this.zkLeaderElectionListener = leaderElectionListener;
     this.keyBuilder = this.zkUtils.getKeyBuilder();
     this.hostName = getHostName();
+    this.zkLeaderElectorListener = zkLeaderElectorListener; // listener to inform the caller that they have become the leader
+    if (previousProcessorChangeListener == null)
+      this.previousProcessorChangeListener =  new PreviousProcessorChangeListener();
+    else
+      this.previousProcessorChangeListener = previousProcessorChangeListener;
   }
 
-  public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils) {
-    this.zkLeaderElectionListener = new ZkLeaderElectionListener();
-    this.processorIdStr = processorIdStr;
-    this.zkUtils = zkUtils;
-    this.keyBuilder = this.zkUtils.getKeyBuilder();
-    this.hostName = getHostName();
+  public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils, ZkLeaderElectorListener zkLeaderElectorListener) {
+    this(processorIdStr, zkUtils, zkLeaderElectorListener, null);
+
   }
 
   // TODO: This should go away once we integrate with Zk based Job Coordinator
@@ -81,6 +86,10 @@ public class ZkLeaderElector implements LeaderElector {
     }
   }
 
+  public interface ZkLeaderElectorListener {
+    void onBecomingLeader();
+  }
+
   @Override
   public boolean tryBecomeLeader() {
     String currentPath = zkUtils.registerProcessorAndGetId(hostName);
@@ -96,6 +105,7 @@ public class ZkLeaderElector implements LeaderElector {
     if (index == 0) {
       isLeader.getAndSet(true);
       LOGGER.info(zLog("Eligible to become the leader!"));
+      zkLeaderElectorListener.onBecomingLeader(); // inform the caller
       return true;
     }
 
@@ -105,11 +115,13 @@ public class ZkLeaderElector implements LeaderElector {
     if (!predecessor.equals(currentSubscription)) {
       if (currentSubscription != null) {
         LOGGER.debug(zLog("Unsubscribing data change for " + currentSubscription));
-        zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, zkLeaderElectionListener);
+        zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
+            previousProcessorChangeListener);
       }
       currentSubscription = predecessor;
       LOGGER.info(zLog("Subscribing data change for " + predecessor));
-      zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, zkLeaderElectionListener);
+      zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
+          previousProcessorChangeListener);
     }
     /**
      * Verify that the predecessor still exists. This step is needed because the ZkClient subscribes for data changes
@@ -146,7 +158,7 @@ public class ZkLeaderElector implements LeaderElector {
   }
 
   // Only by non-leaders
-  class ZkLeaderElectionListener implements IZkDataListener {
+  class PreviousProcessorChangeListener implements IZkDataListener {
 
     @Override
     public void handleDataChange(String dataPath, Object data) throws Exception {

http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index d0a269d..b11e02f 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.zk;
 
+import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
@@ -58,6 +59,7 @@ public class ZkUtils {
   private volatile String ephemeralPath = null;
   private final ZkKeyBuilder keyBuilder;
   private final int connectionTimeoutMs;
+  private final String processorId = "TO BE PASSED IN THE CONSTRUCTOR"; //TODO
 
   public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) {
     this.keyBuilder = zkKeyBuilder;
@@ -143,4 +145,51 @@ public class ZkUtils {
   public void close() throws ZkInterruptedException {
     zkClient.close();
   }
+
+  /**
+    * subscribe for changes of JobModel version
+    * @param dataListener describe this
+    */
+  public void subscribeToJobModelVersionChange(IZkDataListener dataListener) {
+    LOG.info("pid=" + processorId + " subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath());
+    zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
+  }
+
+  /**
+   * read the jobmodel version from ZK
+   * @return jobmodel version as a string
+   */
+  public String getJobModelVersion() {
+    return zkClient.<String>readData(keyBuilder.getJobModelVersionPath());
+  }
+
+  /**
+   * verify that given paths exist in ZK
+   * @param paths
+   */
+  public void makeSurePersistentPathsExists(String[] paths) {
+    for (String path : paths) {
+      if (!zkClient.exists(path)) {
+        zkClient.createPersistent(path, true);
+      }
+    }
+  }
+
+  /**
+   * subscribe to the changes in the list of processors in ZK
+   * @param listener
+   */
+  public void subscribeToProcessorChange(IZkChildListener listener) {
+    LOG.info("pid=" + processorId + " subscribing for child change at:" + keyBuilder.getProcessorsPath());
+    zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener);
+  }
+
+  public void deleteRoot() {
+    String rootPath = keyBuilder.getRootPath();
+    if (rootPath != null && !rootPath.isEmpty() && zkClient.exists(rootPath)) {
+      LOG.info("pid=" + processorId + " Deleteing root: " + rootPath);
+      zkClient.deleteRecursive(rootPath);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
index e04f7c9..8e048b2 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
@@ -45,8 +45,8 @@ public class TestZkKeyBuilder {
   @Test
   public void testParseIdFromPath() {
     Assert.assertEquals(
-        ZkKeyBuilder.PROCESSOR_ID_PREFIX + "1",
-        ZkKeyBuilder.parseIdFromPath("/test/processors/" + ZkKeyBuilder.PROCESSOR_ID_PREFIX + "1"));
+        "1",
+        ZkKeyBuilder.parseIdFromPath("/test/processors/" + "1"));
     Assert.assertNull(ZkKeyBuilder.parseIdFromPath(null));
     Assert.assertNull(ZkKeyBuilder.parseIdFromPath(""));
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index b999ec5..6342fde 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -70,9 +70,13 @@ public class TestZkLeaderElector {
     }
   }
 
+  public static class BooleanResult {
+    public boolean res = false;
+  }
 
   @After
   public void testTeardown() {
+    testZkUtils.deleteRoot();
     testZkUtils.close();
   }
 
@@ -94,8 +98,17 @@ public class TestZkLeaderElector {
         thenReturn(KEY_BUILDER.getProcessorsPath() + "/0000000000");
     when(mockZkUtils.getSortedActiveProcessors()).thenReturn(activeProcessors);
 
-    ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils);
-    Assert.assertTrue(leaderElector.tryBecomeLeader());
+    BooleanResult isLeader = new BooleanResult();
+    ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils,
+      new ZkLeaderElector.ZkLeaderElectorListener() {
+        @Override
+        public void onBecomingLeader() {
+          isLeader.res = true;
+        }
+      }
+    );
+    leaderElector.tryBecomeLeader();
+    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader.res, 2, 100));
   }
 
   @Test
@@ -104,7 +117,13 @@ public class TestZkLeaderElector {
     ZkUtils mockZkUtils = mock(ZkUtils.class);
     when(mockZkUtils.getSortedActiveProcessors()).thenReturn(new ArrayList<String>());
 
-    ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, mockZkUtils);
+    ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, mockZkUtils,
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+          }
+        }
+    );
     try {
       leaderElector.tryBecomeLeader();
       Assert.fail("Was expecting leader election to fail!");
@@ -118,29 +137,50 @@ public class TestZkLeaderElector {
    */
   @Test
   public void testLeaderElection() {
+    BooleanResult isLeader1 = new BooleanResult();
+    BooleanResult isLeader2 = new BooleanResult();
+    BooleanResult isLeader3 = new BooleanResult();
     // Processor-1
     ZkUtils zkUtils1 = getZkUtilsWithNewClient();
-    ZkLeaderElector leaderElector1 = new ZkLeaderElector(
-        "1",
-        zkUtils1);
+    ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1,
+      new ZkLeaderElector.ZkLeaderElectorListener() {
+        @Override
+        public void onBecomingLeader() {
+          isLeader1.res = true;
+        }
+      }
+    );
 
     // Processor-2
     ZkUtils zkUtils2 = getZkUtilsWithNewClient();
-    ZkLeaderElector leaderElector2 = new ZkLeaderElector(
-        "2",
-        zkUtils2);
+    ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2,
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            isLeader2.res = true;
+          }
+        }
+    );
 
     // Processor-3
     ZkUtils zkUtils3  = getZkUtilsWithNewClient();
-    ZkLeaderElector leaderElector3 = new ZkLeaderElector(
-        "3",
-        zkUtils3);
+    ZkLeaderElector leaderElector3 = new ZkLeaderElector("3", zkUtils3,
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            isLeader3.res = true;
+          }
+        });
 
     Assert.assertEquals(0, testZkUtils.getSortedActiveProcessors().size());
 
-    Assert.assertTrue(leaderElector1.tryBecomeLeader());
-    Assert.assertFalse(leaderElector2.tryBecomeLeader());
-    Assert.assertFalse(leaderElector3.tryBecomeLeader());
+    leaderElector1.tryBecomeLeader();
+    leaderElector2.tryBecomeLeader();
+    leaderElector3.tryBecomeLeader();
+
+    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 100));
 
     Assert.assertEquals(3, testZkUtils.getSortedActiveProcessors().size());
 
@@ -166,16 +206,26 @@ public class TestZkLeaderElector {
     final CountDownLatch electionLatch = new CountDownLatch(1);
     final AtomicInteger count = new AtomicInteger(0);
 
+    BooleanResult isLeader1 = new BooleanResult();
+    BooleanResult isLeader2 = new BooleanResult();
+    BooleanResult isLeader3 = new BooleanResult();
+
+
     // Processor-1
     ZkUtils zkUtils1 = getZkUtilsWithNewClient();
     zkUtils1.registerProcessorAndGetId("processor1");
     ZkLeaderElector leaderElector1 = new ZkLeaderElector(
         "1",
         zkUtils1,
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            isLeader1.res = true;
+          }
+        },
         new IZkDataListener() {
           @Override
           public void handleDataChange(String dataPath, Object data) throws Exception {
-
           }
 
           @Override
@@ -191,6 +241,12 @@ public class TestZkLeaderElector {
     ZkLeaderElector leaderElector2 = new ZkLeaderElector(
         "2",
         zkUtils2,
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            isLeader2.res = true;
+          }
+        },
         new IZkDataListener() {
           @Override
           public void handleDataChange(String dataPath, Object data) throws Exception {
@@ -223,6 +279,12 @@ public class TestZkLeaderElector {
     ZkLeaderElector leaderElector3 = new ZkLeaderElector(
         "3",
         zkUtils3,
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            isLeader3.res = true;
+          }
+        },
         new IZkDataListener() {
           @Override
           public void handleDataChange(String dataPath, Object data) throws Exception {
@@ -236,9 +298,12 @@ public class TestZkLeaderElector {
         });
 
     // Join Leader Election
-    Assert.assertTrue(leaderElector1.tryBecomeLeader());
-    Assert.assertFalse(leaderElector2.tryBecomeLeader());
-    Assert.assertFalse(leaderElector3.tryBecomeLeader());
+    leaderElector1.tryBecomeLeader();
+    leaderElector2.tryBecomeLeader();
+    leaderElector3.tryBecomeLeader();
+    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 100));
 
     Assert.assertTrue(leaderElector1.amILeader());
     Assert.assertFalse(leaderElector2.amILeader());
@@ -278,12 +343,22 @@ public class TestZkLeaderElector {
     final CountDownLatch electionLatch = new CountDownLatch(1);
     final AtomicInteger count = new AtomicInteger(0);
 
+    BooleanResult isLeader1 = new BooleanResult();
+    BooleanResult isLeader2 = new BooleanResult();
+    BooleanResult isLeader3 = new BooleanResult();
+
     // Processor-1
     ZkUtils zkUtils1 = getZkUtilsWithNewClient();
     zkUtils1.registerProcessorAndGetId("processor1");
     ZkLeaderElector leaderElector1 = new ZkLeaderElector(
         "1",
         zkUtils1,
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            isLeader1.res = true;
+          }
+        },
         new IZkDataListener() {
           @Override
           public void handleDataChange(String dataPath, Object data) throws Exception {
@@ -302,6 +377,12 @@ public class TestZkLeaderElector {
     ZkLeaderElector leaderElector2 = new ZkLeaderElector(
         "2",
         zkUtils2,
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            isLeader2.res = true;
+          }
+        },
         new IZkDataListener() {
           @Override
           public void handleDataChange(String dataPath, Object data) throws Exception {
@@ -320,6 +401,12 @@ public class TestZkLeaderElector {
     ZkLeaderElector leaderElector3 = new ZkLeaderElector(
         "3",
         zkUtils3,
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            isLeader3.res = true;
+          }
+        },
         new IZkDataListener() {
           @Override
           public void handleDataChange(String dataPath, Object data) throws Exception {
@@ -347,9 +434,12 @@ public class TestZkLeaderElector {
         });
 
     // Join Leader Election
-    Assert.assertTrue(leaderElector1.tryBecomeLeader());
-    Assert.assertFalse(leaderElector2.tryBecomeLeader());
-    Assert.assertFalse(leaderElector3.tryBecomeLeader());
+    leaderElector1.tryBecomeLeader();
+    leaderElector2.tryBecomeLeader();
+    leaderElector3.tryBecomeLeader();
+    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 100));
 
     List<String> currentActiveProcessors = testZkUtils.getSortedActiveProcessors();
     Assert.assertEquals(3, currentActiveProcessors.size());
@@ -373,15 +463,29 @@ public class TestZkLeaderElector {
 
   @Test
   public void testAmILeader() {
+    BooleanResult isLeader1 = new BooleanResult();
+    BooleanResult isLeader2 = new BooleanResult();
     // Processor-1
     ZkLeaderElector leaderElector1 = new ZkLeaderElector(
         "1",
-        getZkUtilsWithNewClient());
+        getZkUtilsWithNewClient(),
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            isLeader1.res = true;
+          }
+        });
 
     // Processor-2
     ZkLeaderElector leaderElector2 = new ZkLeaderElector(
         "2",
-        getZkUtilsWithNewClient());
+        getZkUtilsWithNewClient(),
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            isLeader2.res = true;
+          }
+        });
 
     // Before Leader Election
     Assert.assertFalse(leaderElector1.amILeader());

http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 855d29d..b719e28 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -18,6 +18,8 @@
  */
 package org.apache.samza.zk;
 
+import java.util.function.BooleanSupplier;
+import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
@@ -32,10 +34,10 @@ import org.junit.Test;
 public class TestZkUtils {
   private static EmbeddedZookeeper zkServer = null;
   private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
-  private ZkConnection zkConnection = null;
   private ZkClient zkClient = null;
   private static final int SESSION_TIMEOUT_MS = 20000;
   private static final int CONNECTION_TIMEOUT_MS = 10000;
+  private ZkUtils zkUtils;
 
   @BeforeClass
   public static void setup() throws InterruptedException {
@@ -57,11 +59,21 @@ public class TestZkUtils {
     } catch (ZkNodeExistsException e) {
       // Do nothing
     }
+
+
+    zkUtils = new ZkUtils(
+        KEY_BUILDER,
+        zkClient,
+        SESSION_TIMEOUT_MS);
+
+    zkUtils.connect();
+
   }
 
 
   @After
   public void testTeardown() {
+    zkUtils.close();
     zkClient.close();
   }
 
@@ -72,34 +84,91 @@ public class TestZkUtils {
 
   @Test
   public void testRegisterProcessorId() {
-    ZkUtils utils = new ZkUtils(
-        KEY_BUILDER,
-        zkClient,
-        SESSION_TIMEOUT_MS);
-    utils.connect();
-    String assignedPath = utils.registerProcessorAndGetId("0.0.0.0");
+    String assignedPath = zkUtils.registerProcessorAndGetId("0.0.0.0");
     Assert.assertTrue(assignedPath.startsWith(KEY_BUILDER.getProcessorsPath()));
 
     // Calling registerProcessorId again should return the same ephemeralPath as long as the session is valid
-    Assert.assertTrue(utils.registerProcessorAndGetId("0.0.0.0").equals(assignedPath));
+    Assert.assertTrue(zkUtils.registerProcessorAndGetId("0.0.0.0").equals(assignedPath));
 
-    utils.close();
   }
 
   @Test
   public void testGetActiveProcessors() {
-    ZkUtils utils = new ZkUtils(
-        KEY_BUILDER,
-        zkClient,
-        SESSION_TIMEOUT_MS);
-    utils.connect();
+    Assert.assertEquals(0, zkUtils.getSortedActiveProcessors().size());
+    zkUtils.registerProcessorAndGetId("processorData");
 
-    Assert.assertEquals(0, utils.getSortedActiveProcessors().size());
-    utils.registerProcessorAndGetId("processorData");
+    Assert.assertEquals(1, zkUtils.getSortedActiveProcessors().size());
 
-    Assert.assertEquals(1, utils.getSortedActiveProcessors().size());
+  }
 
-    utils.close();
+  @Test
+  public void testSubscribeToJobModelVersionChange() {
+
+    ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
+    String root = keyBuilder.getRootPath();
+    zkClient.deleteRecursive(root);
+
+    class Result {
+      String res = "";
+      public String getRes() {
+        return res;
+      }
+      public void updateRes(String newRes) {
+        res = newRes;
+      }
+    }
+
+    Assert.assertFalse(zkUtils.exists(root));
+
+    // create the paths
+    zkUtils.makeSurePersistentPathsExists(
+        new String[]{root, keyBuilder.getJobModelVersionPath(), keyBuilder.getProcessorsPath()});
+    Assert.assertTrue(zkUtils.exists(root));
+    Assert.assertTrue(zkUtils.exists(keyBuilder.getJobModelVersionPath()));
+    Assert.assertTrue(zkUtils.exists(keyBuilder.getProcessorsPath()));
+
+    final Result res = new Result();
+    // define the callback
+    IZkDataListener dataListener = new IZkDataListener() {
+      @Override
+      public void handleDataChange(String dataPath, Object data)
+          throws Exception {
+        res.updateRes((String) data);
+      }
+
+      @Override
+      public void handleDataDeleted(String dataPath)
+          throws Exception {
+        Assert.fail("Data wasn't deleted;");
+      }
+    };
+    // subscribe
+    zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
+    zkClient.subscribeDataChanges(keyBuilder.getProcessorsPath(), dataListener);
+    // update
+    zkClient.writeData(keyBuilder.getJobModelVersionPath(), "newVersion");
+
+    // verify
+    Assert.assertTrue(testWithDelayBackOff(() -> "newVersion".equals(res.getRes()), 2, 1000));
+
+    // update again
+    zkClient.writeData(keyBuilder.getProcessorsPath(), "newProcessor");
+
+    Assert.assertTrue(testWithDelayBackOff(() -> "newProcessor".equals(res.getRes()), 2, 1000));
   }
 
+  public static boolean testWithDelayBackOff(BooleanSupplier cond, long startDelayMs, long maxDelayMs) {
+    long delay = startDelayMs;
+    while (delay < maxDelayMs) {
+      if (cond.getAsBoolean())
+        return true;
+      try {
+        Thread.sleep(delay);
+      } catch (InterruptedException e) {
+        return false;
+      }
+      delay *= 2;
+    }
+    return false;
+  }
 }


[7/9] samza git commit: SAMZA-1107:Job model publish

Posted by jm...@apache.org.
SAMZA-1107:Job model publish

add utils for publishing job model and job model version to ZK.

Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Author: Boris Shkolnik <bo...@apache.org>
Author: navina <na...@apache.org>

Reviewers: Navina Ramesh <na...@apache.org>, Fred Ji <fr...@yahoo.com>

Closes #67 from sborya/JobModelPublish1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d104013e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d104013e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d104013e

Branch: refs/heads/samza-fluent-api-v1
Commit: d104013ef16ffd959916a52e9e3f6e67a6e486b3
Parents: 4d7b3b3
Author: Boris Shkolnik <bo...@apache.org>
Authored: Wed Mar 1 13:49:29 2017 -0800
Committer: navina <na...@apache.org>
Committed: Wed Mar 1 13:49:29 2017 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/samza/zk/ZkUtils.java  | 75 +++++++++++++++++++-
 .../org/apache/samza/zk/TestZkKeyBuilder.java   | 12 ++++
 .../java/org/apache/samza/zk/TestZkUtils.java   | 46 ++++++++++--
 3 files changed, 128 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/d104013e/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 320cd49..73376b1 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.zk;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -27,6 +28,11 @@ import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.zookeeper.data.Stat;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -155,6 +161,44 @@ public class ZkUtils {
   }
 
   /**
+   * Publishes new job model into ZK.
+   * This call should FAIL if the node already exists.
+   * @param jobModelVersion  version of the jobModeL to publish
+   * @param jobModel jobModel to publish
+   *
+   */
+  public void publishJobModel(String jobModelVersion, JobModel jobModel) {
+    try {
+      ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
+      String jobModelStr = mmapper.writerWithDefaultPrettyPrinter().writeValueAsString(jobModel);
+      LOG.info("pid=" + processorId + " jobModelAsString=" + jobModelStr);
+      zkClient.createPersistent(keyBuilder.getJobModelPath(jobModelVersion), jobModelStr);
+      LOG.info("wrote jobModel path =" + keyBuilder.getJobModelPath(jobModelVersion));
+    } catch (Exception e) {
+      LOG.error("JobModel publish failed for version=" + jobModelVersion, e);
+      throw new SamzaException(e);
+    }
+  }
+
+  /**
+   * get the job model from ZK by version
+   * @param jobModelVersion jobModel version to get
+   * @return job model for this version
+   */
+  public JobModel getJobModel(String jobModelVersion) {
+    LOG.info("pid=" + processorId + "read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion));
+    Object data = zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion));
+    ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
+    JobModel jm;
+    try {
+      jm = mmapper.readValue((String) data, JobModel.class);
+    } catch (IOException e) {
+      throw new SamzaException("failed to read JobModel from ZK", e);
+    }
+    return jm;
+  }
+
+  /**
    * read the jobmodel version from ZK
    * @return jobmodel version as a string
    */
@@ -163,6 +207,36 @@ public class ZkUtils {
   }
 
   /**
+   * publish the version number of the next JobModel
+   * @param oldVersion - used to validate, that no one has changed the version in the meanwhile.
+   * @param newVersion - new version.
+   */
+  public void publishJobModelVersion(String oldVersion, String newVersion) {
+    Stat stat = new Stat();
+    String currentVersion = zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat);
+    LOG.info("pid=" + processorId + " publishing new version: " + newVersion + "; oldVersion = " + oldVersion + "(" + stat
+        .getVersion() + ")");
+
+    if (currentVersion != null && !currentVersion.equals(oldVersion)) {
+      throw new SamzaException(
+          "Someone change JobModelVersion while the leader was generating one: expected" + oldVersion + ", got " + currentVersion);
+    }
+    // data version is the ZK version of the data from the ZK.
+    int dataVersion = stat.getVersion();
+    try {
+      stat = zkClient.writeDataReturnStat(keyBuilder.getJobModelVersionPath(), newVersion, dataVersion);
+    } catch (Exception e) {
+      String msg = "publish job model version failed for new version = " + newVersion + "; old version = " + oldVersion;
+      LOG.error(msg, e);
+      throw new SamzaException(msg);
+    }
+    LOG.info("pid=" + processorId +
+        " published new version: " + newVersion + "; expected data version = " + (dataVersion  + 1) + "(actual data version after update = " + stat.getVersion()
+        +    ")");
+  }
+
+
+  /**
    * verify that given paths exist in ZK
    * @param paths - paths to verify or create
    */
@@ -190,5 +264,4 @@ public class ZkUtils {
       zkClient.deleteRecursive(rootPath);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/d104013e/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
index 8e048b2..b56d279 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
@@ -50,4 +50,16 @@ public class TestZkKeyBuilder {
     Assert.assertNull(ZkKeyBuilder.parseIdFromPath(null));
     Assert.assertNull(ZkKeyBuilder.parseIdFromPath(""));
   }
+
+  @Test
+  public void testJobModelPath() {
+
+    ZkKeyBuilder builder = new ZkKeyBuilder("test");
+
+    Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_VERSION_PATH, builder.getJobModelVersionPath());
+    Assert.assertEquals("/test/jobModels", builder.getJobModelPathPrefix());
+    String version = "2";
+    Assert.assertEquals("/test/jobModels/" + version, builder.getJobModelPath(version));
+    Assert.assertEquals("/test/versionBarriers", builder.getJobModelVersionBarrierPrefix());
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/d104013e/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index b719e28..58c3ed6 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -18,11 +18,17 @@
  */
 package org.apache.samza.zk;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.function.BooleanSupplier;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -60,7 +66,6 @@ public class TestZkUtils {
       // Do nothing
     }
 
-
     zkUtils = new ZkUtils(
         KEY_BUILDER,
         zkClient,
@@ -96,11 +101,9 @@ public class TestZkUtils {
   public void testGetActiveProcessors() {
     Assert.assertEquals(0, zkUtils.getSortedActiveProcessors().size());
     zkUtils.registerProcessorAndGetId("processorData");
-
     Assert.assertEquals(1, zkUtils.getSortedActiveProcessors().size());
-
   }
-
+  
   @Test
   public void testSubscribeToJobModelVersionChange() {
 
@@ -157,6 +160,41 @@ public class TestZkUtils {
     Assert.assertTrue(testWithDelayBackOff(() -> "newProcessor".equals(res.getRes()), 2, 1000));
   }
 
+  @Test
+  public void testPublishNewJobModel() {
+    ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
+    String root = keyBuilder.getRootPath();
+    zkClient.deleteRecursive(root);
+    String version = "1";
+    String oldVersion = "0";
+
+    zkUtils.makeSurePersistentPathsExists(
+        new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()});
+
+    zkUtils.publishJobModelVersion(oldVersion, version);
+    Assert.assertEquals(version, zkUtils.getJobModelVersion());
+
+    String newerVersion = Long.toString(Long.valueOf(version) + 1);
+    zkUtils.publishJobModelVersion(version, newerVersion);
+    Assert.assertEquals(newerVersion, zkUtils.getJobModelVersion());
+
+    try {
+      zkUtils.publishJobModelVersion(oldVersion, "10"); //invalid new version
+      Assert.fail("publish invalid version should've failed");
+    } catch (SamzaException e) {
+      // expected
+    }
+
+    // create job model
+    Map<String, String> configMap = new HashMap<>();
+    Map<Integer, ContainerModel> containers = new HashMap<>();
+    MapConfig config = new MapConfig(configMap);
+    JobModel jobModel = new JobModel(config, containers);
+
+    zkUtils.publishJobModel(version, jobModel);
+    Assert.assertEquals(jobModel, zkUtils.getJobModel(version));
+  }
+
   public static boolean testWithDelayBackOff(BooleanSupplier cond, long startDelayMs, long maxDelayMs) {
     long delay = startDelayMs;
     while (delay < maxDelayMs) {


[5/9] samza git commit: Fix a rendering issue in the Samza security web-page

Posted by jm...@apache.org.
Fix a rendering issue in the Samza security web-page


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c58d74b3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c58d74b3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c58d74b3

Branch: refs/heads/samza-fluent-api-v1
Commit: c58d74b355d8406dae7428b6ccc15c634a964216
Parents: 098a11d
Author: vjagadish1989 <jv...@linkedin.com>
Authored: Mon Feb 27 17:38:17 2017 -0800
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Mon Feb 27 17:39:00 2017 -0800

----------------------------------------------------------------------
 docs/learn/documentation/versioned/yarn/yarn-security.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c58d74b3/docs/learn/documentation/versioned/yarn/yarn-security.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/yarn/yarn-security.md b/docs/learn/documentation/versioned/yarn/yarn-security.md
index 848326b..8d164c6 100644
--- a/docs/learn/documentation/versioned/yarn/yarn-security.md
+++ b/docs/learn/documentation/versioned/yarn/yarn-security.md
@@ -1,7 +1,7 @@
 ---
 layout: page
 title: YARN Security
---------------------
+---
 <!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with


[9/9] samza git commit: Merge master

Posted by jm...@apache.org.
Merge master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/72fc185a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/72fc185a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/72fc185a

Branch: refs/heads/samza-fluent-api-v1
Commit: 72fc185a052815ae604ec9d50aa47dc6b2d94d92
Parents: a83c69a e6c1eed
Author: Jacob Maes <jm...@linkedin.com>
Authored: Mon Mar 6 15:40:28 2017 -0800
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Mon Mar 6 16:18:13 2017 -0800

----------------------------------------------------------------------
 docs/learn/documentation/versioned/index.html   |   2 +-
 .../versioned/jobs/configuration-table.html     | 225 ++++++++++++++-----
 .../documentation/versioned/jobs/logging.md     |   5 +
 .../versioned/yarn/yarn-security.md             |   2 +-
 .../samza/system/ExecutionEnvironment.java      |  29 ++-
 .../org/apache/samza/system/StreamProvider.java |  78 -------
 .../org/apache/samza/system/StreamSpec.java     |   2 +-
 .../processor/SamzaContainerController.java     |   1 +
 .../apache/samza/processor/StreamProcessor.java |  10 +-
 .../system/AbstractExecutionEnvironment.java    |  52 +++--
 .../samza/zk/BarrierForVersionUpgrade.java      |  46 ++++
 .../samza/zk/ScheduleAfterDebounceTime.java     |   8 +-
 .../samza/zk/ZkBarrierForVersionUpgrade.java    | 166 ++++++++++++++
 .../java/org/apache/samza/zk/ZkController.java  |  32 +++
 .../org/apache/samza/zk/ZkControllerImpl.java   | 163 ++++++++++++++
 .../apache/samza/zk/ZkControllerListener.java   |  34 +++
 .../java/org/apache/samza/zk/ZkKeyBuilder.java  |  24 +-
 .../org/apache/samza/zk/ZkLeaderElector.java    |  36 ++-
 .../main/java/org/apache/samza/zk/ZkUtils.java  | 129 ++++++++++-
 .../org/apache/samza/config/JobConfig.scala     |   8 -
 .../org/apache/samza/config/StreamConfig.scala  | 136 +++++++----
 .../apache/samza/task/ReadableCoordinator.scala |   1 +
 .../samza/example/TestBasicStreamGraphs.java    |   7 +-
 .../samza/example/TestBroadcastExample.java     |   9 +-
 .../apache/samza/example/TestJoinExample.java   |   2 +-
 .../apache/samza/example/TestWindowExample.java |   2 +-
 .../TestAbstractExecutionEnvironment.java       |  60 ++---
 .../zk/TestZkBarrierForVersionUpgrade.java      | 148 ++++++++++++
 .../org/apache/samza/zk/TestZkKeyBuilder.java   |  16 +-
 .../apache/samza/zk/TestZkLeaderElector.java    | 163 +++++++++++---
 .../java/org/apache/samza/zk/TestZkUtils.java   | 143 ++++++++++--
 .../apache/samza/config/Log4jSystemConfig.java  |  12 +-
 32 files changed, 1430 insertions(+), 321 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/72fc185a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --cc samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index fbfe90f,9d6cbc2..5f22409
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@@ -43,9 -43,7 +43,8 @@@ object JobConfig 
    val SAMZA_FWK_PATH = "samza.fwk.path"
    val SAMZA_FWK_VERSION = "samza.fwk.version"
    val JOB_COORDINATOR_SYSTEM = "job.coordinator.system"
-   val JOB_METADATA_DEFAULT_SYSTEM = "job.metadata.system"
    val JOB_DEFAULT_SYSTEM = "job.default.system"
 +  val JOB_DEFAULT_PARTITIONS = "job.default.partitions"
    val JOB_CONTAINER_COUNT = "job.container.count"
    val jOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size"
    val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode"

http://git-wip-us.apache.org/repos/asf/samza/blob/72fc185a/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
----------------------------------------------------------------------
diff --cc samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
index 8ecd44f,8ecd44f..f817379
--- a/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
@@@ -21,6 -21,6 +21,7 @@@ package org.apache.samza.example
  import java.lang.reflect.Field;
  import org.apache.samza.Partition;
  import org.apache.samza.config.Config;
++import org.apache.samza.config.MapConfig;
  import org.apache.samza.operators.impl.OperatorGraph;
  import org.apache.samza.system.SystemStreamPartition;
  import org.apache.samza.task.StreamOperatorTask;
@@@ -47,7 -47,7 +48,7 @@@ public class TestBasicStreamGraphs 
  
    @Test
    public void testUserTask() throws Exception {
--    Config mockConfig = mock(Config.class);
++    Config mockConfig = spy(new MapConfig());
      TaskContext mockContext = mock(TaskContext.class);
      when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
      TestWindowExample userTask = new TestWindowExample(this.inputPartitions);
@@@ -64,7 -64,7 +65,7 @@@
  
    @Test
    public void testSplitTask() throws Exception {
--    Config mockConfig = mock(Config.class);
++    Config mockConfig = spy(new MapConfig());
      TaskContext mockContext = mock(TaskContext.class);
      when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
      TestBroadcastExample splitTask = new TestBroadcastExample(this.inputPartitions);
@@@ -81,7 -81,7 +82,7 @@@
  
    @Test
    public void testJoinTask() throws Exception {
--    Config mockConfig = mock(Config.class);
++    Config mockConfig = spy(new MapConfig());
      TaskContext mockContext = mock(TaskContext.class);
      when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
      TestJoinExample joinTask = new TestJoinExample(this.inputPartitions);

http://git-wip-us.apache.org/repos/asf/samza/blob/72fc185a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/samza/blob/72fc185a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------


[3/9] samza git commit: Fixing checkstyle error in StreamGraphImpl causing build failures

Posted by jm...@apache.org.
Fixing checkstyle error in StreamGraphImpl causing build failures


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/65df0b5d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/65df0b5d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/65df0b5d

Branch: refs/heads/samza-fluent-api-v1
Commit: 65df0b5df2931f9383e4b1f21e5b9f0901b6286f
Parents: f1bc1d0
Author: navina <na...@apache.org>
Authored: Fri Feb 24 18:17:29 2017 -0800
Committer: navina <na...@apache.org>
Committed: Fri Feb 24 18:17:44 2017 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/samza/operators/StreamGraphImpl.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/65df0b5d/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index 353f455..8ca8157 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -207,7 +207,7 @@ public class StreamGraphImpl implements StreamGraph {
    * @return  a {@link MessageStreamImpl} object corresponding to the {@code systemStream}
    */
   public MessageStreamImpl getInputStream(SystemStream sstream) {
-    for(MessageStream entry: this.inStreams.values()) {
+    for (MessageStream entry: this.inStreams.values()) {
       if (((InputStreamImpl) entry).getSpec().getSystemName() == sstream.getSystem() &&
           ((InputStreamImpl) entry).getSpec().getPhysicalName() == sstream.getStream()) {
         return (MessageStreamImpl) entry;