You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/05/02 01:56:29 UTC

samza git commit: SAMZA-1248 - Fix StandAlone barrier start list.

Repository: samza
Updated Branches:
  refs/heads/master 81b173246 -> ad1f16175


SAMZA-1248 - Fix StandAlone barrier start list.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ad1f1617
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ad1f1617
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ad1f1617

Branch: refs/heads/master
Commit: ad1f161751bbe69220dd2bb0da64be3e2f64d674
Parents: 81b1732
Author: Boris Shkolnik <bo...@apache.org>
Authored: Mon May 1 18:32:48 2017 -0700
Committer: nramesh <nr...@linkedin.com>
Committed: Mon May 1 18:32:48 2017 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/samza/zk/ZkJobCoordinator.java  | 5 ++---
 samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java    | 2 +-
 .../src/test/java/org/apache/samza/zk/TestZkUtils.java       | 8 ++------
 3 files changed, 5 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ad1f1617/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 2535654..1ddedbc 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -63,8 +63,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   private final CoordinationUtils coordinationUtils;
 
   private JobModel newJobModel;
-  private JobModel jobModel;
-
+ 
   public ZkJobCoordinator(String processorId, Config config, ScheduleAfterDebounceTime debounceTimer,
                           SamzaContainerController containerController) {
     this.debounceTimer = debounceTimer;
@@ -215,7 +214,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     }
     log.info("generate new job model: processorsIds: " + Arrays.toString(containerIds.toArray()));
 
-    jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
+    JobModel jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
         containerIds);
 
     log.info("pid=" + processorId + "Generated jobModel: " + jobModel);

http://git-wip-us.apache.org/repos/asf/samza/blob/ad1f1617/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index fee8405..be877a4 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -169,7 +169,7 @@ public class ZkUtils {
 
       for (String child : znodeIds) {
         String fullPath = String.format("%s/%s", processorPath, child);
-        processorIds.add(readProcessorData(fullPath));
+        processorIds.add(new ProcessorData(readProcessorData(fullPath)).getProcessorId());
       }
 
       LOG.info("Found these children - " + znodeIds);

http://git-wip-us.apache.org/repos/asf/samza/blob/ad1f1617/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index b8dc295..63e2361 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -114,12 +114,8 @@ public class TestZkUtils {
     l = zkUtils.getSortedActiveProcessorsIDs();
     Assert.assertEquals(2, l.size());
 
-    ProcessorData pd = new ProcessorData(l.get(0));
-    Assert.assertEquals(" ID1 didn't match", "1", pd.getProcessorId());
-    Assert.assertEquals(" Host1 didn't match", "host1", pd.getHost());
-    pd = new ProcessorData(l.get(1));
-    Assert.assertEquals(" ID2 didn't match", "2", pd.getProcessorId());
-    Assert.assertEquals(" Host2 didn't match", "host2", pd.getHost());
+    Assert.assertEquals(" ID1 didn't match", "1", l.get(0));
+    Assert.assertEquals(" ID2 didn't match", "2", l.get(1));
   }
   
   @Test