You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/05/23 00:58:36 UTC

[1/6] samza git commit: SAMZA-1258. Integration tests. Happy Path.

Repository: samza
Updated Branches:
  refs/heads/0.13.0 864562444 -> 06860479f


SAMZA-1258. Integration tests. Happy Path.

This is reincarnation of PR #157
It was not possible to merge/rebase that branch, so I had to create another one and reapply the changes.

Author: Boris Shkolnik <bo...@apache.org>

Reviewers: Xinyu Liu <xi...@apache.org>

Closes #178 from sborya/unitTestHappyPath4


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4011459b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4011459b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4011459b

Branch: refs/heads/0.13.0
Commit: 4011459b0d4e09fdc5b9bd6c6bcdfe0050fcc671
Parents: 8645624
Author: Boris Shkolnik <bo...@apache.org>
Authored: Fri May 19 10:03:00 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Mon May 22 17:56:10 2017 -0700

----------------------------------------------------------------------
 .../samza/zk/ZkCoordinationServiceFactory.java  |  12 +-
 .../org/apache/samza/zk/ZkJobCoordinator.java   |   1 +
 .../test/processor/TestZkStreamProcessor.java   | 291 +++++++++++++++++++
 .../processor/TestZkStreamProcessorBase.java    | 270 +++++++++++++++++
 4 files changed, 572 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4011459b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
index 07da147..9971732 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
@@ -19,16 +19,24 @@
 package org.apache.samza.zk;
 
 import org.I0Itec.zkclient.ZkClient;
+import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ZkConfig;
-import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.CoordinationServiceFactory;
+import org.apache.samza.coordinator.CoordinationUtils;
+
 
 public class ZkCoordinationServiceFactory implements CoordinationServiceFactory {
   // TODO - Why should this method be synchronized?
   synchronized public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) {
     ZkConfig zkConfig = new ZkConfig(config);
-    ZkClient zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
+    ZkClient zkClient;
+    try {
+      zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
+    } catch (Exception e) {
+      throw new SamzaException("zkClient failed to connect to ZK at :" + zkConfig.getZkConnect(), e);
+    }
+
     ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs());
     return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, new ScheduleAfterDebounceTime());
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/4011459b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 91249de..6ad10d2 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -89,6 +89,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     if (coordinatorListener != null) {
       coordinatorListener.onJobModelExpired();
     }
+
     debounceTimer.stopScheduler();
     zkController.stop();
 

http://git-wip-us.apache.org/repos/asf/samza/blob/4011459b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java
new file mode 100644
index 0000000..b409532
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.processor;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.samza.processor.StreamProcessor;
+import org.apache.samza.zk.TestZkUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+/**
+ * Happy path tests.
+ * Start 1, 2, 5 processors and make sure they all consume all the events.
+ */
+public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
+
+  private AtomicInteger counter = new AtomicInteger(1);
+  private String testSystem;
+  private String inputTopic;
+  private String outputTopic;
+  private int messageCount = 40;
+
+  private Map<String, String> map;
+
+
+  @Before
+  public void setupTest() {
+    // for each tests - make the common parts unique
+    int seqNum = counter.getAndAdd(1);
+    testSystem = "test-system" + seqNum;
+    inputTopic = "numbers" + seqNum;
+    outputTopic = "output" + seqNum;
+
+    map = createConfigs(testSystem, inputTopic, outputTopic, messageCount);
+
+    // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a
+    // TopicExistsException since StreamProcessor auto-creates them.
+    createTopics(inputTopic, outputTopic);
+  }
+
+  @Test
+  public void testSingleStreamProcessor() {
+    testStreamProcessor(new String[]{"1"});
+  }
+
+  @Test
+  public void testTwoStreamProcessors() {
+    testStreamProcessor(new String[]{"1", "2"});
+  }
+
+  @Test
+  public void testFiveStreamProcessors() {
+    testStreamProcessor(new String[]{"1", "2", "3", "4", "5"});
+  }
+
+  // main test method for happy path with fixed number of processors
+  private void testStreamProcessor(String[] processorIds) {
+
+    // create a latch of the size == number of messages
+    TestZkStreamProcessorBase.TestStreamTask.endLatch = new CountDownLatch(messageCount);
+
+    // initialize the the processors
+    // we need startLatch to know when the processor has been completely initialized
+    StreamProcessor[] streamProcessors = new StreamProcessor[processorIds.length];
+    CountDownLatch[] startCountDownLatches = new CountDownLatch[processorIds.length];
+    for (int i = 0; i < processorIds.length; i++) {
+      startCountDownLatches[i] = new CountDownLatch(1);
+      streamProcessors[i] = createStreamProcessor(processorIds[i], map, startCountDownLatches[i], null);
+    }
+
+    // produce messageCount messages, starting with key '0'
+    produceMessages(0, inputTopic, messageCount);
+
+    // run the processors in a separate threads
+    Thread[] threads = new Thread[processorIds.length];
+    for (int i = 0; i < processorIds.length; i++) {
+      threads[i] = runInThread(streamProcessors[i], TestZkStreamProcessorBase.TestStreamTask.endLatch);
+      threads[i].start();
+      // wait until the processor reports that it has started
+      try {
+        startCountDownLatches[i].await(1000, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        Assert.fail("got interrupted while waiting for the " + i + "th processor to start.");
+      }
+    }
+
+    // wait until all the events are consumed
+    try {
+      TestZkStreamProcessorBase.TestStreamTask.endLatch.await(10, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      Assert.fail("endLatch.await failed with an interruption:" + e.getLocalizedMessage());
+    }
+
+    // collect all the threads
+    try {
+      for (Thread t : threads) {
+        synchronized (t) {
+          t.notify(); // to stop the thread
+        }
+        t.join(1000);
+      }
+    } catch (InterruptedException e) {
+      Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage());
+    }
+
+    verifyNumMessages(outputTopic, messageCount, messageCount);
+  }
+
+  @Test
+  /**
+   * Similar to the previous tests, but add another processor in the middle
+   */ public void testStreamProcessorWithAdd() {
+
+    // set number of events we expect wo read by both processes in total:
+    // p1 - reads 'messageCount' at first
+    // p1 and p2 read all messageCount together, since they start from the beginning.
+    // so we expect total 3 x messageCounts
+    int totalEventsToGenerate = 3 * messageCount;
+    TestStreamTask.endLatch = new CountDownLatch(totalEventsToGenerate);
+
+    // create first processor
+    CountDownLatch startCountDownLatch1 = new CountDownLatch(1);
+    StreamProcessor sp = createStreamProcessor("1", map, startCountDownLatch1, null);
+
+    // produce first batch of messages starting with 0
+    produceMessages(0, inputTopic, messageCount);
+
+    // start the first processor
+    Thread t1 = runInThread(sp, TestStreamTask.endLatch);
+    t1.start();
+
+    // wait until the processor reports that it has started
+    try {
+      startCountDownLatch1.await(1000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      Assert.fail("got interrupted while waiting for the first processor to start.");
+    }
+
+    // make sure it consumes all the messages from the first batch
+    waitUntilConsumedN(totalEventsToGenerate - messageCount);
+
+    // start the second processor
+    CountDownLatch countDownLatch2 = new CountDownLatch(1);
+    StreamProcessor sp2 = createStreamProcessor("2", map, countDownLatch2, null);
+    Thread t2 = runInThread(sp2, TestStreamTask.endLatch);
+    t2.start();
+
+    // wait until the processor reports that it has started
+    try {
+      countDownLatch2.await(1000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      Assert.fail("got interrupted while waiting for the 2nd processor to start.");
+    }
+
+    // wait for at least one full debounce time to let the system to publish and distribute the new job model
+    TestZkUtils.sleepMs(3000);
+
+    // produce the second batch of the messages, starting with 'messageCount'
+    produceMessages(messageCount, inputTopic, messageCount);
+
+    // wait until all the events are consumed
+    // make sure it consumes all the messages from the first batch
+    waitUntilConsumedN(0);
+
+    // shutdown both
+    try {
+      synchronized (t1) {
+        t1.notify();
+      }
+      synchronized (t2) {
+        t2.notify();
+      }
+      t1.join(1000);
+      t2.join(1000);
+    } catch (InterruptedException e) {
+      Assert.fail("Failed to join finished threads:" + e.getLocalizedMessage());
+    }
+
+    // p1 will read messageCount events, and then p1 and p2 will read 2xmessageCount events together,
+    // but the expected values are the same 0-79, they will appear in the output more then once, but we should mark then only one time.
+    // total number of events we gonna get is 80+40=120
+    verifyNumMessages(outputTopic, 2 * messageCount, totalEventsToGenerate);
+  }
+
+  @Test
+  /**
+   * same as other happy path messages, but with one processor removed in the middle
+   */ public void testStreamProcessorWithRemove() {
+
+    // set number of events we expect to read by both processes in total:
+    // p1 and p2 - both read messageCount at first and p1 is shutdown, new batch of events is generated
+    // and p2 will read all of them from the beginning (+ 2 x messageCounts, total 3 x)
+    int totalEventsToGenerate = 3 * messageCount;
+    TestStreamTask.endLatch = new CountDownLatch(totalEventsToGenerate);
+
+    // create first processor
+    CountDownLatch startCountDownLatch1 = new CountDownLatch(1);
+    CountDownLatch stopCountDownLatch1 = new CountDownLatch(1);
+    StreamProcessor sp1 = createStreamProcessor("1", map, startCountDownLatch1, stopCountDownLatch1);
+
+    // start the first processor
+    Thread t1 = runInThread(sp1, TestStreamTask.endLatch);
+    t1.start();
+
+    // start the second processor
+    CountDownLatch countDownLatch2 = new CountDownLatch(1);
+    StreamProcessor sp2 = createStreamProcessor("2", map, countDownLatch2, null);
+    Thread t2 = runInThread(sp2, TestStreamTask.endLatch);
+    t2.start();
+
+    // wait until the processor reports that it has started
+    try {
+      startCountDownLatch1.await(1000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      Assert.fail("got interrupted while waiting for the first processor to start.");
+    }
+
+    // wait until the processor reports that it has started
+    try {
+      countDownLatch2.await(1000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      Assert.fail("got interrupted while waiting for the 2nd processor to start.");
+    }
+
+    // produce first batch of messages starting with 0
+    produceMessages(0, inputTopic, messageCount);
+
+    // make sure they consume all the messages from the first batch
+    waitUntilConsumedN(totalEventsToGenerate - messageCount);
+
+    // stop the first processor
+    synchronized (t1) {
+      t1.notify(); // this should stop it
+    }
+
+    // wait until it's really down
+    try {
+      stopCountDownLatch1.await(1000, TimeUnit.MILLISECONDS);
+      System.out.println("Processor 1 is down");
+    } catch (InterruptedException e) {
+      Assert.fail("got interrupted while waiting for the 1st processor to stop.");
+    }
+
+    // wait for at least one full debounce time to let the system to publish and distribute the new job model
+    TestZkUtils.sleepMs(3000);
+
+    // produce the second batch of the messages, starting with 'messageCount'
+    produceMessages(messageCount, inputTopic, messageCount);
+
+    // wait until p2 consumes all the message by itself;
+    waitUntilConsumedN(0);
+
+    // shutdown p2
+
+    try {
+      synchronized (t2) {
+        t2.notify();
+      }
+      t2.join(1000);
+    } catch (InterruptedException e) {
+      Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage());
+    }
+
+    // processor1 and 2 will both read 20 events (total 40), and then processor2 read 80 events by itself,
+    // but the expected values are the same 0-79 - we should get each value one time.
+    // Meanwhile the number of events we gonna get is 40 + 80
+    verifyNumMessages(outputTopic, 2 * messageCount, totalEventsToGenerate);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4011459b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java
new file mode 100644
index 0000000..9320feb
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.processor;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.processor.StreamProcessor;
+import org.apache.samza.processor.StreamProcessorLifecycleListener;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.test.StandaloneIntegrationTestHarness;
+import org.apache.samza.test.StandaloneTestUtils;
+import org.apache.samza.zk.TestZkUtils;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness {
+  public final static Logger LOG = LoggerFactory.getLogger(TestZkStreamProcessorBase.class);
+  public final static int BAD_MESSAGE_KEY = 1000;
+  // to avoid long sleeps, we rather use multiple attempts with shorter sleeps
+  private final static int ATTEMPTS_NUMBER = 5;
+
+
+
+  // auxiliary methods
+  protected StreamProcessor createStreamProcessor(final String pId, Map<String, String> map,
+      final CountDownLatch startLatchCountDown, final CountDownLatch stopLatchCountDown) {
+    map.put(ApplicationConfig.PROCESSOR_ID, pId);
+
+    StreamProcessor processor = new StreamProcessor(new MapConfig(map), new HashMap<>(), TestStreamTask::new,
+        new StreamProcessorLifecycleListener() {
+
+          @Override
+          public void onStart() {
+            if (startLatchCountDown != null) {
+              startLatchCountDown.countDown();
+            }
+          }
+
+          @Override
+          public void onShutdown() {
+            if (stopLatchCountDown != null) {
+              stopLatchCountDown.countDown();
+            }
+            LOG.info("onShutdown is called for pid=" + pId);
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+            LOG.info("onFailure is called for pid=" + pId);
+          }
+        });
+
+    return processor;
+  }
+
+  protected void createTopics(String inputTopic, String outputTopic) {
+    TestUtils.createTopic(zkUtils(), inputTopic, 5, 1, servers(), new Properties());
+    TestUtils.createTopic(zkUtils(), outputTopic, 5, 1, servers(), new Properties());
+  }
+
+  protected Map<String, String> createConfigs(String testSystem, String inputTopic, String outputTopic,
+      int messageCount) {
+    Map<String, String> configs = new HashMap<>();
+    configs.putAll(StandaloneTestUtils
+        .getStandaloneConfigs("test-job", "org.apache.samza.test.processor.TestZkStreamProcessor.TestStreamTask"));
+    configs.putAll(StandaloneTestUtils
+        .getKafkaSystemConfigs(testSystem, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING,
+            true));
+    configs.put("task.inputs", String.format("%s.%s", testSystem, inputTopic));
+    configs.put("app.messageCount", String.valueOf(messageCount));
+    configs.put("app.outputTopic", outputTopic);
+    configs.put("app.outputSystem", testSystem);
+    configs.put(ZkConfig.ZK_CONNECT, zkConnect());
+
+    configs.put("job.systemstreampartition.grouper.factory",
+        "org.apache.samza.container.grouper.stream.GroupByPartitionFactory");
+    configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+
+    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.zk.ZkJobCoordinatorFactory");
+
+    return configs;
+  }
+
+  /**
+   * Produces the provided number of messages to the topic.
+   */
+  protected void produceMessages(final int start, String topic, int numMessages) {
+    KafkaProducer producer = getKafkaProducer();
+    for (int i = start; i < numMessages + start; i++) {
+      try {
+        LOG.info("producing " + i);
+        producer.send(new ProducerRecord(topic, String.valueOf(i).getBytes())).get();
+      } catch (InterruptedException | ExecutionException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  /**
+   * Runs the provided stream processor by starting it, waiting on the provided latch with a timeout,
+   * and then stopping it.
+   */
+  protected Thread runInThread(final StreamProcessor processor, CountDownLatch latch) {
+    Thread t = new Thread() {
+
+      @Override
+      public void run() {
+        processor.start();
+        try {
+          // just wait
+          synchronized (this) {
+            this.wait(100000);
+          }
+          LOG.info("notified. Abandon the wait.");
+        } catch (InterruptedException e) {
+          LOG.error("wait interrupted" + e);
+        }
+        LOG.info("Stopping the processor");
+        processor.stop();
+      }
+    };
+    return t;
+  }
+
+  // for sequential values we can generate them automatically
+  protected void verifyNumMessages(String topic, int numberOfSequentialValues, int exectedNumMessages) {
+    // we should get each value one time
+    // create a map of all expected values to validate
+    Map<Integer, Boolean> expectedValues = new HashMap<>(numberOfSequentialValues);
+    for (int i = 0; i < numberOfSequentialValues; i++) {
+      expectedValues.put(i, false);
+    }
+    verifyNumMessages(topic, expectedValues, exectedNumMessages);
+  }
+
+  /**
+   * Consumes data from the topic until there are no new messages for a while
+   * and asserts that the number of consumed messages is as expected.
+   */
+  protected void verifyNumMessages(String topic, final Map<Integer, Boolean> expectedValues, int expectedNumMessages) {
+    KafkaConsumer consumer = getKafkaConsumer();
+    consumer.subscribe(Collections.singletonList(topic));
+
+    Map<Integer, Boolean> map = new HashMap<>(expectedValues);
+    int count = 0;
+    int emptyPollCount = 0;
+
+    while (count < expectedNumMessages && emptyPollCount < 5) {
+      ConsumerRecords records = consumer.poll(5000);
+      if (!records.isEmpty()) {
+        Iterator<ConsumerRecord> iterator = records.iterator();
+        while (iterator.hasNext()) {
+          ConsumerRecord record = iterator.next();
+          String val = new String((byte[]) record.value());
+          LOG.info("Got value " + val);
+          map.put(Integer.valueOf(val), true);
+          count++;
+        }
+      } else {
+        emptyPollCount++;
+        LOG.warn("empty polls " + emptyPollCount);
+      }
+    }
+    // filter out numbers we did not get
+    long numFalse = map.values().stream().filter(v -> !v).count();
+    Assert.assertEquals("didn't get this number of events ", 0, numFalse);
+    Assert.assertEquals(expectedNumMessages, count);
+  }
+
+  // StreamTaskClass
+  public static class TestStreamTask implements StreamTask, InitableTask {
+    // static field since there's no other way to share state b/w a task instance and
+    // stream processor when constructed from "task.class".
+    static CountDownLatch endLatch;
+    private int processedMessageCount = 0;
+    private String processorId;
+    private String outputTopic;
+    private String outputSystem;
+
+    @Override
+    public void init(Config config, TaskContext taskContext)
+        throws Exception {
+      this.processorId = config.get(ApplicationConfig.PROCESSOR_ID);
+      this.outputTopic = config.get("app.outputTopic", "output");
+      this.outputSystem = config.get("app.outputSystem", "test-system");
+    }
+
+    @Override
+    public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector,
+        TaskCoordinator taskCoordinator)
+        throws Exception {
+
+      Object message = incomingMessageEnvelope.getMessage();
+
+      // inject a failure
+      if (Integer.valueOf((String) message) >= BAD_MESSAGE_KEY && processorId.equals("1")) {
+        LOG.info("process method will fail for msg=" + message);
+        throw new Exception("Processing in the processor " + processorId + " failed ");
+      }
+
+      messageCollector.send(new OutgoingMessageEnvelope(new SystemStream(outputSystem, outputTopic), message));
+      processedMessageCount++;
+
+      LOG.info("Stream processor " + processorId + ";offset=" + incomingMessageEnvelope.getOffset() + "; totalRcvd="
+              + processedMessageCount + ";received " + message + "; ssp=" + incomingMessageEnvelope
+              .getSystemStreamPartition());
+
+      synchronized (endLatch) {
+        endLatch.countDown();
+      }
+    }
+  }
+
+  protected void waitUntilConsumedN(int untilLeft) {
+    int attempts = ATTEMPTS_NUMBER;
+    while (attempts > 0) {
+      long leftEventsCount = TestZkStreamProcessorBase.TestStreamTask.endLatch.getCount();
+      //System.out.println("2current count = " + leftEventsCount);
+      if (leftEventsCount == untilLeft) { // should read all of them
+        //System.out.println("2read all. current count = " + leftEventsCount);
+        break;
+      }
+      TestZkUtils.sleepMs(1000);
+      attempts--;
+    }
+    Assert.assertTrue("Didn't read all the leftover events in " + ATTEMPTS_NUMBER + " attempts", attempts > 0);
+  }
+}


[5/6] samza git commit: SAMZA-1280; document for the general/universal resource localization in YARN

Posted by xi...@apache.org.
SAMZA-1280; document for the general/universal resource localization in YARN

This PR added a MD for localizing resource in Samza on YARN  by configuring path, local.name, local.type and local.visibility, and also updated the configuration table and index table.

Author: Fred Ji <fj...@linkedin.com>

Reviewers: Navina Ramesh <na...@apache.org>

Closes #191 from fredji97/resourceLocalizationDoc


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8ff402ce
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8ff402ce
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8ff402ce

Branch: refs/heads/0.13.0
Commit: 8ff402ce28e390b602ed380fc0fee786b2fddc1d
Parents: d72bf83
Author: Fred Ji <fj...@linkedin.com>
Authored: Mon May 22 15:09:32 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Mon May 22 17:56:48 2017 -0700

----------------------------------------------------------------------
 docs/learn/documentation/versioned/index.html   |   3 +-
 .../versioned/jobs/configuration-table.html     |  43 +++++++-
 .../versioned/yarn/yarn-host-affinity.md        |   2 +-
 .../yarn/yarn-resource-localization.md          | 108 +++++++++++++++++++
 .../versioned/yarn/yarn-security.md             |   2 +-
 5 files changed, 154 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/8ff402ce/docs/learn/documentation/versioned/index.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/index.html b/docs/learn/documentation/versioned/index.html
index e3cecc1..a710383 100644
--- a/docs/learn/documentation/versioned/index.html
+++ b/docs/learn/documentation/versioned/index.html
@@ -81,9 +81,10 @@ title: Documentation
   <li><a href="yarn/application-master.html">Application Master</a></li>
   <li><a href="yarn/isolation.html">Isolation</a></li>
   <li><a href="yarn/yarn-host-affinity.html">Host Affinity & Yarn</a></li>
+  <li><a href="yarn/yarn-resource-localization.html">Resource Localization</a></li>
+  <li><a href="yarn/yarn-security.html">Yarn Security</a></li>
   <li><a href="hdfs/producer.html">Writing to HDFS</a></li>
   <li><a href="hdfs/consumer.html">Reading from HDFS</a></li>
-  <li><a href="yarn/yarn-security.html">Yarn Security</a></li>
 <!-- TODO write yarn pages
   <li><a href="">Fault Tolerance</a></li>
   <li><a href="">Security</a></li>

http://git-wip-us.apache.org/repos/asf/samza/blob/8ff402ce/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index fe1580f..f34146c 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -59,7 +59,7 @@
                 font-family: monospace;
             }
 
-            span.system, span.stream, span.store, span.serde, span.rewriter, span.listener, span.reporter {
+            span.system, span.stream, span.store, span.serde, span.rewriter, span.listener, span.reporter, span.resource {
                 padding: 1px;
                 margin: 1px;
                 border-width: 1px;
@@ -101,6 +101,11 @@
                 background-color: #dff;
                 border-color: #bdd;
             }
+
+            span.resource {
+                background-color: #ded;
+                border-color: #bcb;
+            }
         </style>
     </head>
 
@@ -1935,6 +1940,42 @@
                 </tr>
 
                 <tr>
+                    <td class="property" id="yarn-resources-resource-name-path">yarn.resources.<span class="resource">resource-name</span>.path</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        The path for localizing the resource for <span class="resource">resource-name</span>. The scheme (e.g. http, ftp, hdsf, file, etc) in the path should be configured in YARN core-site.xml as fs.&lt;scheme&gt;.impl and is associated with a <a href="https://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/fs/FileSystem.html">FileSystem</a>.
+                        If defined, the resource will be localized in the Samza application directory before the Samza job runs. More details can be found <a href="../yarn/yarn-resource-localization.html">here</a>.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="yarn-resources-resource-name-local-name">yarn.resources.<span class="resource">resource-name</span>.local.name</td>
+                    <td class="default"><span class="resource">resource-name</span></td>
+                    <td class="description">
+                        The new local name for the resource after localization.
+                        This configuration only applies when yarn.resources.<span class="resource">resource-name</span>.path is configured.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="yarn-resources-resource-name-local-type">yarn.resources.<span class="resource">resource-name</span>.local.type</td>
+                    <td class="default">FILE</td>
+                    <td class="description">
+                        The type for the resource after localization. It can be ARCHIVE (archived directory), FILE, or PATTERN (the entries extracted from the archive with the pattern).
+                        This configuration only applies when yarn.resources.<span class="resource">resource-name</span>.path is configured.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="yarn-resources-resource-name-local-visibility">yarn.resources.<span class="resource">resource-name</span>.local.visibility</td>
+                    <td class="default">APPLICATION</td>
+                    <td class="description">
+                        The visibility for the resource after localization. It can be PUBLIC (visible to everyone), PRIVATE (visible to all Samza applications of the same account user as this application), or APPLICATION (visible to only this Samza application).
+                        This configuration only applies when yarn.resources.<span class="resource">resource-name</span>.path is configured.
+                    </td>
+                </tr>
+
+                <tr>
                     <th colspan="3" class="section" id="metrics"><a href="../container/metrics.html">Metrics</a></th>
                 </tr>
 

http://git-wip-us.apache.org/repos/asf/samza/blob/8ff402ce/docs/learn/documentation/versioned/yarn/yarn-host-affinity.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/yarn/yarn-host-affinity.md b/docs/learn/documentation/versioned/yarn/yarn-host-affinity.md
index 13bc9b9..14e10cc 100644
--- a/docs/learn/documentation/versioned/yarn/yarn-host-affinity.md
+++ b/docs/learn/documentation/versioned/yarn/yarn-host-affinity.md
@@ -119,4 +119,4 @@ As you have observed, host-affinity cannot be guaranteed all the time due to var
 1. _When the number of containers and/or container-task assignment changes across successive application runs_ - We may be able to re-use local state for a subset of partitions. Currently, there is no logic in the Job Coordinator to handle partitioning of tasks among containers intelligently. Handling this is more involved as relates to [auto-scaling](https://issues.apache.org/jira/browse/SAMZA-336) of the containers. However, with [task-container mapping](https://issues.apache.org/jira/browse/SAMZA-906), this will work better for typical container count adjustments.
 2. _When SystemStreamPartitionGrouper changes across successive application runs_ - When the grouper logic used to distribute the partitions across containers changes, the data in the Coordinator Stream (for changelog-task partition assignment etc) and the data stores becomes invalid. Thus, to be safe, we should flush out all state-related data from the Coordinator Stream. An alternative is to overwrite the Task-ChangelogPartition assignment message and the Container Locality message in the Coordinator Stream, before starting up the job again.
 
-## [Writing to HDFS &raquo;](../hdfs/producer.html)
\ No newline at end of file
+## [Resource Localization &raquo;](../operations/resource-localization.html)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/8ff402ce/docs/learn/documentation/versioned/yarn/yarn-resource-localization.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/yarn/yarn-resource-localization.md b/docs/learn/documentation/versioned/yarn/yarn-resource-localization.md
new file mode 100644
index 0000000..a55670b
--- /dev/null
+++ b/docs/learn/documentation/versioned/yarn/yarn-resource-localization.md
@@ -0,0 +1,108 @@
+---
+layout: page
+title: YARN Resource Localization
+---
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+When Samza jobs run on YARN clusters, sometimes there are needs to preload some files or data (called as resources in this doc) before job starts, such as preparing the job package, fetching job certificate, or etc., Samza supports a general configuration way to localize difference resources.
+
+### Resource Localization Process
+
+For the Samza jobs running on YARN, the resource localization leverages the YARN node manager localization service. Here is a good [deep dive](https://hortonworks.com/blog/resource-localization-in-yarn-deep-dive/) from Horton Works on how localization works in YARN. 
+
+Depending on where and how the resource comes from, fetching the resource is associated with a scheme in the path, such as `http`, `https`, `hdfs`, `ftp`, `file`, etc., which maps to a certain FileSystem for handling the localization. 
+
+If there is an implementation of [FileSystem](https://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/fs/FileSystem.html) on YARN supporting a scheme, then that scheme can be used for resource localization. 
+
+There are some predefined file systems in Hadoop or Samza, which are provided if you run Samza jobs on YARN:
+
+* `org.apache.samza.util.hadoop.HttpFileSystem`: used for fetching resources based on http, or https without client side authentication requirement.
+* `org.apache.hadoop.hdfs.DistributedFileSystem`: used for fetching resource from DFS system on Hadoop.
+* `org.apache.hadoop.fs.LocalFileSystem`: used for copying resources from local file system to the job directory.
+* `org.apache.hadoop.fs.ftp.FTPFileSystem`: used for fetching resources based on ftp.
+* ...
+
+If you would like to have your own file system, you need to implement a class which extends from `org.apache.hadoop.fs.FileSystem`. 
+
+### Job Configuration
+With the configuration properly defined, the resources a job requiring from external or internal locations may be prepared automatically before it runs.
+
+For each resource with the name `<resourceName>` in the Samza job, the following set of job configurations are used when running on a YARN cluster. The first one which definiing resource path is required, but the others are optional and they have default values.
+
+1. `yarn.resources.<resourceName>.path`
+    * Required
+    * The path for fetching the resource for localization, e.g. http://hostname.com/packages/mySamzaJob
+2. `yarn.resources.<resourceName>.local.name`
+    * Optional 
+    * The local name used for the localized resource.
+    * If not set, the default one will be `<resourceName>` from the config key.
+3. `yarn.resources.<resourceName>.local.type`
+    * Optional 
+    * Localized resource type with valid values from: `ARCHIVE`, `FILE`, `PATTERN`.
+        * ARCHIVE: the localized resource will be an archived directory;
+        * FILE: the localized resource will be a file;
+        * PATTERN: the localized resource will be the entries extracted from the archive with the pattern.
+    * If not set, the default value is `FILE`.
+4. `yarn.resources.<resourceName>.local.visibility`
+    * Optional
+    * Localized resource visibility for the resource, and it can be a value from `PUBLIC`, `PRIVATE`, `APPLICATION`
+        * PUBLIC: visible to everyone 
+        * PRIVATE: visible to just the account which runs the job
+        * APPLICATION: visible only to the specific application job which has the resource configuration
+    * If not set, the default value is `APPLICATION`
+
+It is up to you how to name the resource, but `<resourceName>` should be the same in the above configurations to apply to the same resource. 
+
+### YARN Configuration
+Make sure the scheme used in the yarn.resources.&lt;resourceName&gt;.path is configured in YARN core-site.xml with a FileSystem implementation. For example, for scheme `http`, you should have the following property in YARN core-site.xml:
+
+{% highlight xml %}
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<configuration>
+    <property>
+      <name>fs.http.impl</name>
+      <value>org.apache.samza.util.hadoop.HttpFileSystem</value>
+    </property>
+</configuration>
+{% endhighlight %}
+
+You can override a behavior for a scheme by linking it to another file system. For example, you have a special need for localizing a resource for your job through http request, you may implement your own Http File System by extending [FileSystem](https://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/fs/FileSystem.html), and have the following configuration:
+
+{% highlight xml %}
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<configuration>
+    <property>
+      <name>fs.http.impl</name>
+      <value>com.myCompany.MyHttpFileSystem</value>
+    </property>
+</configuration>
+{% endhighlight %}
+
+If you are using other scheme which is not defined in Hadoop or Samza, for example, `yarn.resources.mySampleResource.path=myScheme://host.com/test`, in your job configuration, you may implement your own [FileSystem](https://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/fs/FileSystem.html) such as com.myCompany.MySchemeFileSystem and link it with your own scheme in yarn core-site.xml configuration.
+
+{% highlight xml %}
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<configuration>
+    <property>
+      <name>fs.myScheme.impl</name>
+      <value>com.myCompany.MySchemeFileSystem</value>
+    </property>
+</configuration>
+{% endhighlight %}
+
+## [Yarn Security &raquo;](../yarn/yarn-security.html)

http://git-wip-us.apache.org/repos/asf/samza/blob/8ff402ce/docs/learn/documentation/versioned/yarn/yarn-security.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/yarn/yarn-security.md b/docs/learn/documentation/versioned/yarn/yarn-security.md
index 8d164c6..7b66ed8 100644
--- a/docs/learn/documentation/versioned/yarn/yarn-security.md
+++ b/docs/learn/documentation/versioned/yarn/yarn-security.md
@@ -91,4 +91,4 @@ yarn.token.renewal.interval.seconds=86400
         </property>
     {% endhighlight %}
 
-## [Security &raquo;](../operations/security.html)
+## [Writing to HDFS &raquo;](../hdfs/producer.html)


[2/6] samza git commit: SAMZA-1193. re-enable the test TestZkBarrierForVersionUpgrade.testZkBarrierForVersionUpgrade

Posted by xi...@apache.org.
SAMZA-1193. re-enable the test TestZkBarrierForVersionUpgrade.testZkBarrierForVersionUpgrade

The code changed significantly. I've rerun the test multiple times both with gradle and intelij. It passed every time. I suggest we enable it back.

Author: Boris Shkolnik <bo...@apache.org>

Reviewers: Xinyu Liu <xi...@apache.org>

Closes #181 from sborya/testZkBarrierForVersionUpgrade


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3054e532
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3054e532
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3054e532

Branch: refs/heads/0.13.0
Commit: 3054e5322a0c4973f33ef32149d31a0ae5b713b9
Parents: 4011459
Author: Boris Shkolnik <bo...@apache.org>
Authored: Fri May 19 17:25:26 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Mon May 22 17:56:21 2017 -0700

----------------------------------------------------------------------
 .../zk/TestZkBarrierForVersionUpgrade.java      | 31 ++++++++------------
 1 file changed, 12 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/3054e532/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
index f1bb804..547e32b 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -18,6 +18,11 @@
  */
 package org.apache.samza.zk;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 import junit.framework.Assert;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
@@ -27,33 +32,26 @@ import org.apache.samza.coordinator.CoordinationServiceFactory;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 
 public class TestZkBarrierForVersionUpgrade {
   private static EmbeddedZookeeper zkServer = null;
   private static String testZkConnectionString = null;
   private static CoordinationUtils coordinationUtils;
 
+  private static AtomicInteger counter = new AtomicInteger(1);
+
+
+  @Before
+  public void testSetup() {
 
-  @BeforeClass
-  public static void setup() throws InterruptedException {
     zkServer = new EmbeddedZookeeper();
     zkServer.setup();
     testZkConnectionString = "127.0.0.1:" + zkServer.getPort();
-  }
 
-  @Before
-  public void testSetup() {
-    String groupId = "group1";
+    String groupId = "group" + counter.getAndAdd(1);
     String processorId = "p1";
     Map<String, String> map = new HashMap<>();
     map.put(ZkConfig.ZK_CONNECT, testZkConnectionString);
@@ -67,15 +65,10 @@ public class TestZkBarrierForVersionUpgrade {
   @After
   public void testTearDown() {
     coordinationUtils.reset();
-  }
-
-  @AfterClass
-  public static void teardown() {
     zkServer.teardown();
   }
 
-  // TODO: SAMZA-1193 fix the following flaky test and re-enable it
-  // @Test
+  @Test
   public void testZkBarrierForVersionUpgrade() {
     String barrierId = "b1";
     String ver = "1";


[6/6] samza git commit: Improve documentation for Resource Localization

Posted by xi...@apache.org.
Improve documentation for Resource Localization

This is a follow-up to Fred Ji's original PR : https://github.com/apache/samza/pull/191 .

Author: vjagadish1989 <jv...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@linkedin.com>

Closes #199 from vjagadish1989/doc-improvements


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/06860479
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/06860479
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/06860479

Branch: refs/heads/0.13.0
Commit: 06860479f35b14336864b2bea99120cf8809a46f
Parents: 8ff402c
Author: vjagadish1989 <jv...@linkedin.com>
Authored: Mon May 22 17:42:32 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Mon May 22 17:56:57 2017 -0700

----------------------------------------------------------------------
 .../yarn/yarn-resource-localization.md          | 59 +++++++-------------
 1 file changed, 19 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/06860479/docs/learn/documentation/versioned/yarn/yarn-resource-localization.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/yarn/yarn-resource-localization.md b/docs/learn/documentation/versioned/yarn/yarn-resource-localization.md
index a55670b..3d1c87a 100644
--- a/docs/learn/documentation/versioned/yarn/yarn-resource-localization.md
+++ b/docs/learn/documentation/versioned/yarn/yarn-resource-localization.md
@@ -18,58 +18,49 @@ title: YARN Resource Localization
    See the License for the specific language governing permissions and
    limitations under the License.
 -->
-
-When Samza jobs run on YARN clusters, sometimes there are needs to preload some files or data (called as resources in this doc) before job starts, such as preparing the job package, fetching job certificate, or etc., Samza supports a general configuration way to localize difference resources.
+When running Samza jobs on YARN clusters, you may need to download some resources before startup (For example, downloading the job binaries, fetching certificate files etc.) This step is called as Resource Localization.
 
 ### Resource Localization Process
 
-For the Samza jobs running on YARN, the resource localization leverages the YARN node manager localization service. Here is a good [deep dive](https://hortonworks.com/blog/resource-localization-in-yarn-deep-dive/) from Horton Works on how localization works in YARN. 
-
-Depending on where and how the resource comes from, fetching the resource is associated with a scheme in the path, such as `http`, `https`, `hdfs`, `ftp`, `file`, etc., which maps to a certain FileSystem for handling the localization. 
+For Samza jobs running on YARN, resource localization leverages the YARN node manager's localization service. Here is a [deep dive](https://hortonworks.com/blog/resource-localization-in-yarn-deep-dive/) on how localization works in YARN. 
 
-If there is an implementation of [FileSystem](https://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/fs/FileSystem.html) on YARN supporting a scheme, then that scheme can be used for resource localization. 
+Depending on where and how the resource comes from, fetching the resource is associated with a scheme in the path (such as `http`, `https`, `hdfs`, `ftp`, `file`, etc). The scheme maps to a corresponding `FileSystem` implementation for handling the localization. 
 
-There are some predefined file systems in Hadoop or Samza, which are provided if you run Samza jobs on YARN:
+There are some predefined `FileSystem` implementations in Hadoop and Samza, which are provided if you run Samza jobs on YARN:
 
-* `org.apache.samza.util.hadoop.HttpFileSystem`: used for fetching resources based on http, or https without client side authentication requirement.
+* `org.apache.samza.util.hadoop.HttpFileSystem`: used for fetching resources based on http or https without client side authentication.
 * `org.apache.hadoop.hdfs.DistributedFileSystem`: used for fetching resource from DFS system on Hadoop.
 * `org.apache.hadoop.fs.LocalFileSystem`: used for copying resources from local file system to the job directory.
 * `org.apache.hadoop.fs.ftp.FTPFileSystem`: used for fetching resources based on ftp.
-* ...
 
-If you would like to have your own file system, you need to implement a class which extends from `org.apache.hadoop.fs.FileSystem`. 
+You can create your own file system implementation by creating a class which extends from `org.apache.hadoop.fs.FileSystem`. 
 
-### Job Configuration
-With the configuration properly defined, the resources a job requiring from external or internal locations may be prepared automatically before it runs.
-
-For each resource with the name `<resourceName>` in the Samza job, the following set of job configurations are used when running on a YARN cluster. The first one which definiing resource path is required, but the others are optional and they have default values.
+### Resource Configuration
+You can specify a resource to be localized by the following configuration.
 
+#### Required Configuration
 1. `yarn.resources.<resourceName>.path`
-    * Required
-    * The path for fetching the resource for localization, e.g. http://hostname.com/packages/mySamzaJob
+    * The path for fetching the resource for localization, e.g. http://hostname.com/packages/myResource
+
+#### Optional Configuration
 2. `yarn.resources.<resourceName>.local.name`
-    * Optional 
     * The local name used for the localized resource.
-    * If not set, the default one will be `<resourceName>` from the config key.
+    * If it is not set, the default will be the `<resourceName>` specified in `yarn.resources.<resourceName>.path`
 3. `yarn.resources.<resourceName>.local.type`
-    * Optional 
-    * Localized resource type with valid values from: `ARCHIVE`, `FILE`, `PATTERN`.
+    * The type of the resource with valid values from: `ARCHIVE`, `FILE`, `PATTERN`.
         * ARCHIVE: the localized resource will be an archived directory;
         * FILE: the localized resource will be a file;
         * PATTERN: the localized resource will be the entries extracted from the archive with the pattern.
-    * If not set, the default value is `FILE`.
+    * If it is not set, the default value is `FILE`.
 4. `yarn.resources.<resourceName>.local.visibility`
-    * Optional
-    * Localized resource visibility for the resource, and it can be a value from `PUBLIC`, `PRIVATE`, `APPLICATION`
+    * Visibility for the resource with valid values from `PUBLIC`, `PRIVATE`, `APPLICATION`
         * PUBLIC: visible to everyone 
         * PRIVATE: visible to just the account which runs the job
         * APPLICATION: visible only to the specific application job which has the resource configuration
-    * If not set, the default value is `APPLICATION`
-
-It is up to you how to name the resource, but `<resourceName>` should be the same in the above configurations to apply to the same resource. 
+    * If it is not set, the default value is `APPLICATION`
 
 ### YARN Configuration
-Make sure the scheme used in the yarn.resources.&lt;resourceName&gt;.path is configured in YARN core-site.xml with a FileSystem implementation. For example, for scheme `http`, you should have the following property in YARN core-site.xml:
+Make sure the scheme used in the `yarn.resources.<resourceName>.path` is configured with a corresponding FileSystem implementation in YARN core-site.xml.
 
 {% highlight xml %}
 <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
@@ -81,19 +72,7 @@ Make sure the scheme used in the yarn.resources.&lt;resourceName&gt;.path is con
 </configuration>
 {% endhighlight %}
 
-You can override a behavior for a scheme by linking it to another file system. For example, you have a special need for localizing a resource for your job through http request, you may implement your own Http File System by extending [FileSystem](https://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/fs/FileSystem.html), and have the following configuration:
-
-{% highlight xml %}
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-<configuration>
-    <property>
-      <name>fs.http.impl</name>
-      <value>com.myCompany.MyHttpFileSystem</value>
-    </property>
-</configuration>
-{% endhighlight %}
-
-If you are using other scheme which is not defined in Hadoop or Samza, for example, `yarn.resources.mySampleResource.path=myScheme://host.com/test`, in your job configuration, you may implement your own [FileSystem](https://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/fs/FileSystem.html) such as com.myCompany.MySchemeFileSystem and link it with your own scheme in yarn core-site.xml configuration.
+If you are using your own scheme (for example, `yarn.resources.myResource.path=myScheme://host.com/test`), you can link your [FileSystem](https://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/fs/FileSystem.html) implementation with it as follows.
 
 {% highlight xml %}
 <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>


[4/6] samza git commit: SAMZA-1298; Create zk path.

Posted by xi...@apache.org.
SAMZA-1298; Create zk path.

if ZK path contains extra path at the end, it needs to be created in ZK at first connect.

Author: Boris Shkolnik <bo...@apache.org>

Reviewers: Jagadish <ja...@apache.org>

Closes #197 from sborya/createZkPath


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d72bf837
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d72bf837
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d72bf837

Branch: refs/heads/0.13.0
Commit: d72bf837e7973ffc1e1d317326de168bcf5cc315
Parents: f9849c5
Author: Boris Shkolnik <bo...@apache.org>
Authored: Mon May 22 11:58:53 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Mon May 22 17:56:39 2017 -0700

----------------------------------------------------------------------
 .../samza/zk/ZkCoordinationServiceFactory.java  | 52 ++++++++++++++++++--
 .../main/java/org/apache/samza/zk/ZkUtils.java  | 15 ++----
 .../apache/samza/zk/TestZkLeaderElector.java    | 17 +++----
 .../apache/samza/zk/TestZkProcessorLatch.java   | 20 ++++----
 .../java/org/apache/samza/zk/TestZkUtils.java   | 27 ++++++++--
 5 files changed, 93 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/d72bf837/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
index 9971732..661650d 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
@@ -18,27 +18,69 @@
  */
 package org.apache.samza.zk;
 
+import com.google.common.base.Strings;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.CoordinationServiceFactory;
 import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.zookeeper.client.ConnectStringParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class ZkCoordinationServiceFactory implements CoordinationServiceFactory {
+  private final static Logger LOG = LoggerFactory.getLogger(ZkCoordinationServiceFactory.class);
+
   // TODO - Why should this method be synchronized?
   synchronized public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) {
     ZkConfig zkConfig = new ZkConfig(config);
-    ZkClient zkClient;
+
+    ZkClient zkClient = createZkClient(zkConfig.getZkConnect(),
+        zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
+
+    // make sure the 'path' exists
+    createZkPath(zkConfig.getZkConnect(), zkClient);
+
+    ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs());
+
+    return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, new ScheduleAfterDebounceTime());
+  }
+
+  /**
+   * helper method to create zkClient
+   * @param connectString - zkConnect string
+   * @param sessionTimeoutMS - session timeout
+   * @param connectionTimeoutMs - connection timeout
+   * @return zkClient object
+   */
+  public static ZkClient createZkClient(String connectString, int sessionTimeoutMS, int connectionTimeoutMs) {
     try {
-      zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
+      return new ZkClient(connectString, sessionTimeoutMS, connectionTimeoutMs);
     } catch (Exception e) {
-      throw new SamzaException("zkClient failed to connect to ZK at :" + zkConfig.getZkConnect(), e);
+      // ZkClient constructor may throw a variety of different exceptions, not all of them Zk based.
+      throw new SamzaException("zkClient failed to connect to ZK at :" + connectString, e);
     }
+  }
 
-    ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs());
-    return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, new ScheduleAfterDebounceTime());
+  /**
+   * if ZkConnectString contains some path at the end, it needs to be created when connecting for the first time.
+   * @param zkConnect - connect string
+   * @param zkClient - zkClient object to talk to the ZK
+   */
+  public static void createZkPath(String zkConnect, ZkClient zkClient) {
+    ConnectStringParser parser = new ConnectStringParser(zkConnect);
+
+    String path = parser.getChrootPath();
+    LOG.info("path =" + path);
+    if (!Strings.isNullOrEmpty(path)) {
+      // create this path in zk
+      LOG.info("first connect. creating path =" + path + " in ZK " + parser.getServerAddresses());
+      if (!zkClient.exists(path)) {
+        zkClient.createPersistent(path, true); // will create parents if needed and will not throw exception if exists
+      }
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/d72bf837/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 5c8fcf3..c547901 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -19,6 +19,11 @@
 
 package org.apache.samza.zk;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
@@ -32,12 +37,6 @@ import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Util class to help manage Zk connection and ZkClient.
  * It also provides additional utility methods for read/write/subscribe/unsubscribe access to the ZK tree.
@@ -84,10 +83,6 @@ public class ZkUtils {
     return new ZkConnection(zkConnectString, sessionTimeoutMs);
   }
 
-  public static ZkClient createZkClient(ZkConnection zkConnection, int connectionTimeoutMs) {
-    return new ZkClient(zkConnection, connectionTimeoutMs);
-  }
-
   ZkClient getZkClient() {
     return zkClient;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/d72bf837/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index 7cfad61..393d733 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -18,8 +18,13 @@
  */
 package org.apache.samza.zk;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.apache.samza.SamzaException;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
@@ -32,12 +37,6 @@ import org.junit.Test;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -433,10 +432,10 @@ public class TestZkLeaderElector {
   }
 
   private ZkUtils getZkUtilsWithNewClient() {
-    ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
+    ZkClient zkClient = ZkCoordinationServiceFactory.createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS);
     return new ZkUtils(
         KEY_BUILDER,
-        ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS),
+        zkClient,
         CONNECTION_TIMEOUT_MS);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/d72bf837/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
index 2385b32..9f089a0 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
@@ -18,7 +18,13 @@
  */
 package org.apache.samza.zk;
 
-import org.I0Itec.zkclient.ZkConnection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.coordinator.Latch;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
 import org.junit.After;
@@ -28,13 +34,6 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
 /**
  * The ZkProcessorLatch uses a shared Znode as a latch. Each participant await existence of a target znode under the
  * shared latch, which is a persistent, sequential target znode with value (latchSize - 1). latchSize is the minimum
@@ -215,10 +214,11 @@ public class TestZkProcessorLatch {
 
   }
   private ZkUtils getZkUtilsWithNewClient(String processorId) {
-    ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
+    ZkClient zkClient = ZkCoordinationServiceFactory
+        .createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS);
     return new ZkUtils(
         KEY_BUILDER,
-        ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS),
+        zkClient,
         CONNECTION_TIMEOUT_MS);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/d72bf837/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 63e2361..173b8a6 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -18,7 +18,10 @@
  */
 package org.apache.samza.zk;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.function.BooleanSupplier;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
@@ -35,10 +38,6 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.BooleanSupplier;
-
 public class TestZkUtils {
   private static EmbeddedZookeeper zkServer = null;
   private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
@@ -87,6 +86,26 @@ public class TestZkUtils {
     zkServer.teardown();
   }
 
+
+  @Test
+  public void testInitZkPath() {
+    String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/samza1";
+    ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient);
+
+    Assert.assertTrue(zkClient.exists("/samza1"));
+
+    zkConnect = "127.0.0.1:" + zkServer.getPort() + "/samza1/samza2";
+    ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient);
+
+    Assert.assertTrue(zkClient.exists("/samza1/samza2"));
+
+
+    zkConnect = "127.0.0.1:" + zkServer.getPort(); // empty path.
+    ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient);
+
+    Assert.assertTrue(zkClient.exists("/"));
+  }
+
   @Test
   public void testRegisterProcessorId() {
     String assignedPath = zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1"));


[3/6] samza git commit: SAMZA-1291: StandAlone config

Posted by xi...@apache.org.
SAMZA-1291: StandAlone config

Author: Boris Shkolnik <bo...@apache.org>

Reviewers: Xinyu Liu <xi...@apache.org>

Closes #192 from sborya/StandAloneConfig


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f9849c5c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f9849c5c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f9849c5c

Branch: refs/heads/0.13.0
Commit: f9849c5ce5c4aa3592738d7405428c2f2690a3ba
Parents: 3054e53
Author: Boris Shkolnik <bo...@apache.org>
Authored: Mon May 22 09:36:12 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Mon May 22 17:56:30 2017 -0700

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     | 48 ++++++++++++++++++++
 .../java/org/apache/samza/config/ZkConfig.java  | 14 +++---
 .../zk/TestZkBarrierForVersionUpgrade.java      |  2 +-
 3 files changed, 56 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f9849c5c/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index afa42f5..fe1580f 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -409,6 +409,53 @@
                         <a href="#stores-changelog" class="property">stores.store-name.changelog</a>.
                     </td>
                 </tr>
+                 <tr>
+                    <td class="property" id="job.coordinator.factory">job.coordinator.factory</td>
+                    <td class="default"></td>
+                    <td class="description">
+			Class to use for job coordination. Currently available values are:
+                       <dl>
+                            <dt><code>org.apache.samza.standalone.StandaloneJobCoordinatorFactory</code></dt>
+                            <dd>Fixed partition mapping. No Zoookeeper. </dd>
+                            <dt><code>org.apache.samza.zk.ZkJobCoordinatorFactory</code></dt>
+                            <dd>Zookeeper-based coordination. </dd>
+                        </dl>
+                        Required only for non-cluster-managed applications. Please see the required value for <a href=#task-name-grouper-factory>task-name-grouper-factory </a>
+                    </td>
+                </tr>
+
+                <tr>
+                                              <!-- change link to StandAlone design/tutorial doc. SAMZA-1299 -->
+                <th colspan="3" class="section" id="ZkBasedJobCoordination"><a href="../index.html">Zookeeper-based job configuration</a></th>
+                </tr>
+                <tr>
+                    <td class="property" id="job.coordinator.zk.connect">job.coordinator.zk.connect</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        <strong>Required</strong> for applications with Zookeeper-based coordination. Zookeeper coordinates (in "host:port[/znode]" format) to be used for coordination.
+                    </td>
+                </tr>
+                <tr>
+                    <td class="property" id="job.coordinator.zk.session.timeout.ms">job.coordinator.zk.session.timeout.ms</td>
+                    <td class="default"> 30000 </td>
+                    <td class="description">
+                        Zookeeper session timeout for all the ZK connections in milliseconds. Session timeout controls how long zk client will wait before throwing an exception, when it cannot talk to one of ZK servers.
+                    </td>
+                </tr>
+                <tr>
+                    <td class="property" id="job.coordinator.zk.connection.timeout.ms">job.coordinator.zk.connection.timeout.ms</td>
+                    <td class="default"> 60000 </td>
+                    <td class="description">
+                        Zookeeper connection timeout in milliseconds. Zk connection timeout controls how long client tries to connect to ZK server before giving up.
+                    </td>
+                </tr>
+                <tr>
+                    <td class="property" id="job.coordinator.zk.new.consensus.timeout.ms">job.coordinator.zk.consensus.timeout.ms</td>
+                    <td class="default"> 40000 </td>
+                    <td class="description">
+			How long each processor will wait for all the processors to report acceptance of the new job model before rolling back.
+                    </td>
+                </tr>
                 <tr>
                     <th colspan="3" class="section" id="task"><a href="../api/overview.html">Task configuration</a></th>
                 </tr>
@@ -647,6 +694,7 @@
                         The fully-qualified name of the Java class which determines the factory class which will build the TaskNameGrouper.
                         The default configuration value if the property is not present is <code>task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerCountFactory</code>.<br>
                         The user can specify a custom implementation of the TaskNameGrouperFactory where a custom logic is implemented for grouping the tasks.
+                    <p><strong>Note:</strong> For non-cluster applications (ones using coordination service) one must use <i>org.apache.samza.container.grouper.task.GroupByContainerIdsFactory</i>
                     </td>
                 </tr>
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f9849c5c/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
index fc483eb..34d2542 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
@@ -21,15 +21,15 @@ package org.apache.samza.config;
 
 public class ZkConfig extends MapConfig {
   // Connection string for ZK, format: :<hostname>:<port>,..."
-  public static final String ZK_CONNECT = "coordinator.zk.connect";
-  public static final String ZK_SESSION_TIMEOUT_MS = "coordinator.zk.session-timeout-ms";
-  public static final String ZK_CONNECTION_TIMEOUT_MS = "coordinator.zk.connection-timeout-ms";
+  public static final String ZK_CONNECT = "job.coordinator.zk.connect";
+  public static final String ZK_SESSION_TIMEOUT_MS = "job.coordinator.zk.session.timeout.ms";
+  public static final String ZK_CONNECTION_TIMEOUT_MS = "job.coordinator.zk.connection.timeout.ms";
+  public static final String ZK_CONSENSUS_TIMEOUT_MS = "job.coordinator.zk.consensus.timeout.ms";
 
   public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 60000;
   public static final int DEFAULT_SESSION_TIMEOUT_MS = 30000;
-  public static final String ZK_BARRIER_TIMEOUT_MS = "coordinator.zk.barrier-timeout-ms";
-  public static final int DEFAULT_BARRIER_TIMEOUT_MS = 40000;
-
+  public static final int DEFAULT_CONSENSUS_TIMEOUT_MS = 40000;
+  
   public ZkConfig(Config config) {
     super(config);
   }
@@ -50,6 +50,6 @@ public class ZkConfig extends MapConfig {
   }
 
   public int getZkBarrierTimeoutMs() {
-    return getInt(ZK_BARRIER_TIMEOUT_MS, DEFAULT_BARRIER_TIMEOUT_MS);
+    return getInt(ZK_CONSENSUS_TIMEOUT_MS, DEFAULT_CONSENSUS_TIMEOUT_MS);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f9849c5c/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
index 547e32b..9c91fd3 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -55,7 +55,7 @@ public class TestZkBarrierForVersionUpgrade {
     String processorId = "p1";
     Map<String, String> map = new HashMap<>();
     map.put(ZkConfig.ZK_CONNECT, testZkConnectionString);
-    map.put(ZkConfig.ZK_BARRIER_TIMEOUT_MS, "200");
+    map.put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, "200");
     Config config = new MapConfig(map);
 
     CoordinationServiceFactory serviceFactory = new ZkCoordinationServiceFactory();