You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/12/24 01:04:04 UTC

[7/7] samza git commit: Fixing checkstyle errors

Fixing checkstyle errors


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4918e3ad
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4918e3ad
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4918e3ad

Branch: refs/heads/samza-standalone
Commit: 4918e3ad7c622a36f6c8d8dc8eb6c7778fdaac05
Parents: 0a2b63a
Author: navina <na...@apache.org>
Authored: Fri Dec 23 17:03:25 2016 -0800
Committer: navina <na...@apache.org>
Committed: Fri Dec 23 17:03:25 2016 -0800

----------------------------------------------------------------------
 .../org/apache/samza/config/JavaJobConfig.java  | 21 ++++++++-
 .../samza/config/JobCoordinatorConfig.java      | 23 ++++++++--
 .../java/org/apache/samza/config/ZkConfig.java  | 21 ++++++++-
 .../task/SimpleGroupByContainerCount.java       | 30 ++++++++++---
 .../SimpleGroupByContainerCountFactory.java     |  1 -
 .../leaderelection/LeaderElector.java           | 19 ++++++++
 .../processor/SamzaContainerController.java     | 41 ++++++++++++-----
 .../samza/zk/BarrierForVersionUpgrade.java      | 19 ++++++++
 .../samza/zk/ScheduleAfterDebounceTime.java     | 34 ++++++++++++---
 .../samza/zk/ZkBarrierForVersionUpgrade.java    | 39 +++++++++++++----
 .../java/org/apache/samza/zk/ZkController.java  | 27 +++++++++++-
 .../org/apache/samza/zk/ZkControllerImpl.java   | 36 ++++++++++++---
 .../org/apache/samza/zk/ZkJobCoordinator.java   | 46 +++++++++++++-------
 .../samza/zk/ZkJobCoordinatorFactory.java       | 19 ++++++++
 .../java/org/apache/samza/zk/ZkKeyBuilder.java  | 28 ++++++++++--
 .../org/apache/samza/zk/ZkLeaderElector.java    | 23 +++++++++-
 .../java/org/apache/samza/zk/ZkListener.java    | 19 ++++++++
 .../main/java/org/apache/samza/zk/ZkUtils.java  | 46 +++++++++++++++-----
 18 files changed, 413 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java
index c0747f0..24b9427 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.samza.config;
 
 public class JavaJobConfig extends MapConfig {
@@ -5,7 +24,7 @@ public class JavaJobConfig extends MapConfig {
   private static final String JOB_ID = "job.id"; // streaming.job_id
   private static final String DEFAULT_JOB_ID = "1";
 
-  public JavaJobConfig (Config config) {
+  public JavaJobConfig(Config config) {
     super(config);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
index c8e496e..4bb66de 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
@@ -1,14 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.samza.config;
 
 import com.google.common.base.Strings;
-import org.apache.samza.coordinator.JobCoordinatorFactory;
-import org.apache.samza.util.Util;
 
 public class JobCoordinatorConfig extends MapConfig {
   // TODO: Change this to job-coordinator.factory
   private static final String JOB_COORDINATOR_FACTORY = "job.coordinator.factory";
 
-  public JobCoordinatorConfig (Config config) {
+  public JobCoordinatorConfig(Config config) {
     super(config);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
index 973db42..31b1eda 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.samza.config;
 
 public class ZkConfig extends MapConfig {
@@ -9,7 +28,7 @@ public class ZkConfig extends MapConfig {
   public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 60000;
   public static final int DEFAULT_SESSION_TIMEOUT_MS = 30000;
 
-  public ZkConfig (Config config) {
+  public ZkConfig(Config config) {
     super(config);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java
index 359c4ed..cf225c3 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java
@@ -1,5 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.samza.container.grouper.task;
 
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskModel;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -7,10 +30,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.samza.container.LocalityManager;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.TaskModel;
 
 
 public class SimpleGroupByContainerCount implements TaskNameGrouper {
@@ -18,6 +37,7 @@ public class SimpleGroupByContainerCount implements TaskNameGrouper {
   public SimpleGroupByContainerCount() {
     this.startContainerCount = 1;
   }
+
   public SimpleGroupByContainerCount(int containerCount) {
     if (containerCount <= 0) throw new IllegalArgumentException("Must have at least one container");
     this.startContainerCount = containerCount;
@@ -33,7 +53,7 @@ public class SimpleGroupByContainerCount implements TaskNameGrouper {
   }
 
   public Set<ContainerModel> group(Set<TaskModel> tasks, List<Integer> containersIds) {
-    if(containersIds == null)
+    if (containersIds == null)
       return this.group(tasks);
 
     int containerCount = containersIds.size();

http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java
index 02918f6..b1c6b92 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java
@@ -20,7 +20,6 @@ package org.apache.samza.container.grouper.task;
 
 
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java b/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java
index fc3cac9..2565ee2 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.samza.coordinator.leaderelection;
 
 public interface LeaderElector {

http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
index fb227d0..d448d30 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.samza.processor;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -38,12 +57,12 @@ public class SamzaContainerController {
    * Creates an instance of a controller for instantiating, starting and/or stopping {@link SamzaContainer}
    * Requests to execute a container are submitted to the {@link ExecutorService}
    *
-   * @param taskFactory Factory that be used create instances of {@link org.apache.samza.task.StreamTask} or
-   *                    {@link org.apache.samza.task.AsyncStreamTask}
+   * @param taskFactory         Factory that be used create instances of {@link org.apache.samza.task.StreamTask} or
+   *                            {@link org.apache.samza.task.AsyncStreamTask}
    * @param containerShutdownMs How long the Samza container should wait for an orderly shutdown of task instances
-   * @param metricsReporterMap Map of metric reporter name and {@link MetricsReporter} instance
+   * @param metricsReporterMap  Map of metric reporter name and {@link MetricsReporter} instance
    */
-  public SamzaContainerController (
+  public SamzaContainerController(
       Object taskFactory,
       long containerShutdownMs,
       String processorId,
@@ -62,14 +81,14 @@ public class SamzaContainerController {
   /**
    * Instantiates a container and submits to the executor. This method does not actually wait for the container to
    * fully start-up. For such a behavior, see {@link #awaitStart(long)}
-   *
+   * <p>
    * <b>Note:</b> <i>This method does not stop a currently running container, if any. It is left up to the caller to
    * ensure that the container has been stopped with stopContainer before invoking this method.</i>
    *
-   * @param containerModel {@link ContainerModel} instance to use for the current run of the Container
-   * @param config Complete configuration map used by the Samza job
+   * @param containerModel               {@link ContainerModel} instance to use for the current run of the Container
+   * @param config                       Complete configuration map used by the Samza job
    * @param maxChangelogStreamPartitions Max number of partitions expected in the changelog streams
-   * TODO: Try to get rid of maxChangelogStreamPartitions from method arguments
+   *                                     TODO: Try to get rid of maxChangelogStreamPartitions from method arguments
    */
   public void startContainer(ContainerModel containerModel, Config config, int maxChangelogStreamPartitions) {
     LocalityManager localityManager = null;
@@ -96,7 +115,7 @@ public class SamzaContainerController {
    *
    * @param timeoutMs Maximum time to wait, in milliseconds
    * @return {@code true}, if the container started within the specified wait time and {@code false} if the waiting
-   *          time elapsed
+   * time elapsed
    * @throws InterruptedException if the current thread is interrupted while waiting for container to start-up
    */
   public boolean awaitStart(long timeoutMs) throws InterruptedException {
@@ -107,14 +126,14 @@ public class SamzaContainerController {
    * Stops a running container, if any. Invoking this method multiple times does not have any side-effects.
    */
   public void stopContainer() {
-    if(container == null) {
+    if (container == null) {
       log.warn("Shutdown before a container was created.");
       return;
     }
 
     container.shutdown();
     try {
-      if(containerFuture != null)
+      if (containerFuture != null)
         containerFuture.get(containerShutdownMs, TimeUnit.MILLISECONDS);
     } catch (InterruptedException | ExecutionException e) {
       log.error("Ran into problems while trying to stop the container in the processor!", e);

http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
index 691aced..c2c4c23 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.samza.zk;
 
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
index e217dab..d174938 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -1,18 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.samza.zk;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ScheduleAfterDebounceTime {
   public static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
-  public static final long timeoutMs = 1000*10;
+  public static final long TIMEOUT_MS = 1000 * 10;
 
   public static final String JOB_MODEL_VERSION_CHANGE = "JobModelVersionChange";
   public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange";
@@ -24,19 +45,18 @@ public class ScheduleAfterDebounceTime {
       new ThreadFactoryBuilder().setNameFormat("zk-debounce-thread-%d").setDaemon(true).build());
   private final Map<String, ScheduledFuture> futureHandles = new HashMap<>();
 
-  public ScheduleAfterDebounceTime () {
-
+  public ScheduleAfterDebounceTime() {
   }
 
-  synchronized public void scheduleAfterDebounceTime (String actionName, long debounceTimeMs, Runnable runnable) {//, final ReadyToCreateJobModelListener listener) {
+  synchronized public void scheduleAfterDebounceTime(String actionName, long debounceTimeMs, Runnable runnable) { //, final ReadyToCreateJobModelListener listener) {
     // check if this action has been scheduled already
     ScheduledFuture sf = futureHandles.get(actionName);
-    if(sf != null && !sf.isDone()) {
+    if (sf != null && !sf.isDone()) {
       LOG.info(">>>>>>>>>>>DEBOUNCE: cancel future for " + actionName);
       // attempt to cancel
-      if(! sf.cancel(false) ) {
+      if (!sf.cancel(false)) {
         try {
-          sf.get(timeoutMs, TimeUnit.MILLISECONDS);
+          sf.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
         } catch (Exception e) {
           // we ignore the exception
           LOG.warn("cancel for action " + actionName + " failed with ", e);

http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
index 60a06da..524afed 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -1,6 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.samza.zk;
 
 import java.util.List;
+
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.slf4j.Logger;
@@ -17,7 +37,7 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
 
   final private String barrierPrefix;
 
-  public ZkBarrierForVersionUpgrade( ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer) {
+  public ZkBarrierForVersionUpgrade(ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer) {
     this.zkUtils = zkUtils;
     keyBuilder = zkUtils.getKeyBuilder();
 
@@ -83,7 +103,7 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
       // Find out the event & Log
       boolean allIn = true;
 
-      if(currentChildren == null) {
+      if (currentChildren == null) {
         LOG.info("Got handleChildChange with null currentChildren");
         return;
       }
@@ -101,33 +121,34 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
 
 
       // check if all the names are in
-      for(String n : names) {
-        if(!currentChildren.contains(n)) {
+      for (String n : names) {
+        if (!currentChildren.contains(n)) {
           LOG.info("node " + n + " is still not in the list ");
           allIn = false;
           break;
         }
       }
-      if(allIn) {
+      if (allIn) {
         LOG.info("ALl nodes reached the barrier");
         callback.run(); // all the names have registered
       }
     }
   }
 
-  class ZkBarrierReachedHandler implements  IZkDataListener {
+  class ZkBarrierReachedHandler implements IZkDataListener {
     private final ScheduleAfterDebounceTime debounceTimer;
     private final String barrierPathDone;
     private final Runnable callback;
+
     public ZkBarrierReachedHandler(String barrierPathDone, ScheduleAfterDebounceTime debounceTimer, Runnable callback) {
       this.barrierPathDone = barrierPathDone;
-      this.callback =  callback;
+      this.callback = callback;
       this.debounceTimer = debounceTimer;
     }
 
     @Override
     public void handleDataChange(String dataPath, Object data)
-    throws Exception {
+        throws Exception {
       String done = (String) data;
       LOG.info("got notification about barrier path=" + barrierPathDone + "; done=" + done);
       if (done.equals(BARRIER_DONE)) {
@@ -140,7 +161,7 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
 
     @Override
     public void handleDataDeleted(String dataPath)
-    throws Exception {
+        throws Exception {
       LOG.warn("barrier done got deleted at " + dataPath);
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkController.java b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
index 20e55ab..76ff8d2 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
@@ -1,11 +1,34 @@
-package org.apache.samza.zk;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 
+package org.apache.samza.zk;
 
 public interface ZkController {
-  void register ();
+  void register();
+
   boolean isLeader();
+
   void notifyJobModelChange(String version);
+
   void stop();
+
   void listenToProcessorLiveness();
+
   String currentJobModelVersion();
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
index fdd1f02..b00a3ee 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.samza.zk;
 
 import org.I0Itec.zkclient.IZkChildListener;
@@ -18,7 +37,7 @@ public class ZkControllerImpl implements ZkController {
   private final ZkLeaderElector leaderElector;
   private final ScheduleAfterDebounceTime debounceTimer;
 
-  public ZkControllerImpl (String processorIdStr, ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer, ZkListener zkListener) {
+  public ZkControllerImpl(String processorIdStr, ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer, ZkListener zkListener) {
     this.processorIdStr = processorIdStr;
     this.zkUtils = zkUtils;
     this.zkListener = zkListener;
@@ -30,11 +49,10 @@ public class ZkControllerImpl implements ZkController {
 
   @Override
   public void register() {
-
     // TODO - make a loop here with some number of attempts.
     // possibly split into two method - becomeLeader() and becomeParticipant()
     boolean isLeader = leaderElector.tryBecomeLeader();
-    if(isLeader) {
+    if (isLeader) {
       listenToProcessorLiveness();
 
       //      zkUtils.subscribeToProcessorChange(zkProcessorChangeListener);
@@ -49,7 +67,7 @@ public class ZkControllerImpl implements ZkController {
 
   private void init() {
     ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
-    zkUtils.makeSurePersistentPathsExists(new String[] {
+    zkUtils.makeSurePersistentPathsExists(new String[]{
         keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder.getJobModelPathPrefix()});
   }
 
@@ -82,11 +100,13 @@ public class ZkControllerImpl implements ZkController {
   }
 
   // Only by Leader
-  class ZkProcessorChangeHandler  implements IZkChildListener {
+  class ZkProcessorChangeHandler implements IZkChildListener {
     private final ScheduleAfterDebounceTime debounceTimer;
+
     public ZkProcessorChangeHandler(ScheduleAfterDebounceTime debounceTimer) {
       this.debounceTimer = debounceTimer;
     }
+
     /**
      * Called when the children of the given path changed.
      *
@@ -106,11 +126,14 @@ public class ZkControllerImpl implements ZkController {
 
   class ZkJobModelVersionChangeHandler implements IZkDataListener {
     private final ScheduleAfterDebounceTime debounceTimer;
+
     public ZkJobModelVersionChangeHandler(ScheduleAfterDebounceTime debounceTimer) {
       this.debounceTimer = debounceTimer;
     }
+
     /**
      * called when job model version gets updated
+     *
      * @param dataPath
      * @param data
      * @throws Exception
@@ -123,6 +146,7 @@ public class ZkControllerImpl implements ZkController {
       debounceTimer
           .scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () -> notifyJobModelChange((String) data));
     }
+
     @Override
     public void handleDataDeleted(String dataPath) throws Exception {
       throw new SamzaException("version update path has been deleted!.");
@@ -130,7 +154,7 @@ public class ZkControllerImpl implements ZkController {
   }
 
   public void shutdown() {
-    if(debounceTimer != null)
+    if (debounceTimer != null)
       debounceTimer.stopScheduler();
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index f661547..e83b16b 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -1,17 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.samza.zk;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSystemConfig;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.JobModelManager$;
-import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.processor.SamzaContainerController;
 import org.apache.samza.system.StreamMetadataCache;
@@ -22,7 +35,11 @@ import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * JobCoordinator for stand alone processor managed via Zookeeper.
@@ -60,12 +77,11 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
     barrier = new ZkBarrierForVersionUpgrade(zkUtils, debounceTimer); //should not have any state in it
 
 
-
     // TEMP for model generation
     //////////////////////////////// NEEDS TO BE REPLACED //////////////////////////////////////
     JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
     Map<String, SystemAdmin> systemAdmins = new HashMap<>();
-    for (String systemName: systemConfig.getSystemNames()) {
+    for (String systemName : systemConfig.getSystemNames()) {
       String systemFactoryClassName = systemConfig.getSystemFactory(systemName);
       if (systemFactoryClassName == null) {
         log.error(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
@@ -75,11 +91,10 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
       systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config));
     }
 
-     streamMetadataCache = new StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock
+    streamMetadataCache = new StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock
         .instance());
 
 
-
     ////////////////////////////////////////////////////////////////////////////////////////////
   }
 
@@ -99,8 +114,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
   }
 
   @Override
-  public int getProcessorId()
-  {
+  public int getProcessorId() {
     return processorId;
   }
 
@@ -125,9 +139,9 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
     List<String> currentProcessors = zkUtils.getActiveProcessors();
 
     // get the current version
-    String currentJMVersion  = zkUtils.getJobModelVersion();
+    String currentJMVersion = zkUtils.getJobModelVersion();
     String nextJMVersion;
-    if(currentJMVersion == null)
+    if (currentJMVersion == null)
       nextJMVersion = "1";
     else
       nextJMVersion = Integer.toString(Integer.valueOf(currentJMVersion) + 1);
@@ -139,7 +153,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
     //JobModel jobModel = new JobModel(config, containers);
     StringBuilder sb = new StringBuilder();
     List<Integer> containerIds = new ArrayList<>();
-    for(String processor: currentProcessors){
+    for (String processor : currentProcessors) {
       String zkProcessorId = keyBuilder.parseContainerIdFromProcessorId(processor);
       sb.append(zkProcessorId).append(",");
       containerIds.add(Integer.valueOf(zkProcessorId));
@@ -163,7 +177,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
     log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion);
   }
 
-   //////////////////////////////////////////////////////////////////////////////////////////////
+  //////////////////////////////////////////////////////////////////////////////////////////////
   @Override
   public void onProcessorChange(List<String> processorIds) {
     // Reset debounce Timer

http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index 90b0097..02db340 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.samza.zk;
 
 import org.apache.samza.config.Config;

http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
index 42d0c86..efe3349 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.samza.zk;
 
 import org.apache.samza.SamzaException;
@@ -10,10 +29,11 @@ public class ZkKeyBuilder {
 
   public static final String JOBMODEL_VERSION_PATH = "jobModelVersion";
 
-  public ZkKeyBuilder () {
+  public ZkKeyBuilder() {
     this("");
   }
-  public ZkKeyBuilder (String pathPrefix) {
+
+  public ZkKeyBuilder(String pathPrefix) {
     this.pathPrefix = pathPrefix;
   }
 
@@ -23,12 +43,12 @@ public class ZkKeyBuilder {
 
   public static String parseIdFromPath(String path) {
     if (path != null)
-     return path.substring(path.indexOf(PROCESSOR_ID_PREFIX));
+      return path.substring(path.indexOf(PROCESSOR_ID_PREFIX));
     return null;
   }
 
   public static String parseContainerIdFromProcessorId(String prId) {
-    if(prId == null)
+    if (prId == null)
       throw new SamzaException("processor id is null");
 
     return prId.substring(prId.indexOf(PROCESSOR_ID_PREFIX) + PROCESSOR_ID_PREFIX.length());

http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
index 8bffeb6..ecc0f0b 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
@@ -1,12 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.samza.zk;
 
-import java.util.Arrays;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.apache.samza.SamzaException;
 import org.apache.samza.coordinator.leaderelection.LeaderElector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 
@@ -23,7 +42,7 @@ public class ZkLeaderElector implements LeaderElector {
   private String currentSubscription = null;
   private final Random random = new Random();
 
-  public ZkLeaderElector (String processorIdStr, ZkUtils zkUtils, ZkListener zkListener) {
+  public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils, ZkListener zkListener) {
     this.processorIdStr = processorIdStr;
     this.zkUtils = zkUtils;
     this.keyBuilder = this.zkUtils.getKeyBuilder();

http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java b/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java
index 4a1c491..a0c69c6 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.samza.zk;
 
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index de8c213..05100a5 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -1,6 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.samza.zk;
 
-import java.io.IOException;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.IZkStateListener;
@@ -17,6 +35,7 @@ import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Collections;
@@ -75,8 +94,9 @@ public class ZkUtils {
   public ZkKeyBuilder getKeyBuilder() {
     return keyBuilder;
   }
+
   public void makeSurePersistentPathsExists(String[] paths) {
-    for(String path: paths) {
+    for (String path : paths) {
       if (!zkClient.exists(path)) {
         zkClient.createPersistent(path, true);
       }
@@ -119,13 +139,14 @@ public class ZkUtils {
       throw new SamzaException(e);
     }
   }
+
   public JobModel getJobModel(String jobModelVersion) {
     LOG.info("pid=" + processorId + "read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion));
     Object data = zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion));
     ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
     JobModel jm;
     try {
-      jm = mmapper.readValue((String)data, JobModel.class);
+      jm = mmapper.readValue((String) data, JobModel.class);
     } catch (IOException e) {
       throw new SamzaException("failed to read JobModel from ZK", e);
     }
@@ -140,23 +161,24 @@ public class ZkUtils {
 
   public void publishNewJobModelVersion(String oldVersion, String newVersion) {
     Stat stat = new Stat();
-    String currentVersion =  zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat);
+    String currentVersion = zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat);
     LOG.info("pid=" + processorId + " publishing new version: " + newVersion + "; oldVersion = " + oldVersion + "(" + stat.getVersion() + ")");
-    if(currentVersion != null && !currentVersion.equals(oldVersion)) {
-      throw new SamzaException("Someone change JMVersion while Leader was generating: expected" + oldVersion  + ", got " + currentVersion);
+    if (currentVersion != null && !currentVersion.equals(oldVersion)) {
+      throw new SamzaException("Someone change JMVersion while Leader was generating: expected" + oldVersion + ", got " + currentVersion);
     }
     int dataVersion = stat.getVersion();
     stat = zkClient.writeDataReturnStat(keyBuilder.getJobModelVersionPath(), newVersion, dataVersion);
-    if(stat.getVersion() != dataVersion + 1)
+    if (stat.getVersion() != dataVersion + 1)
       throw new SamzaException("Someone changed data version of the JMVersion while Leader was generating a new one. current= " + dataVersion + ", old version = " + stat.getVersion());
 
     LOG.info("pid=" + processorId +
         " published new version: " + newVersion + "; expected dataVersion = " + dataVersion + "(" + stat.getVersion()
-            + ")");
+        + ")");
   }
 
   /**
    * subscribe for changes of JobModel version
+   *
    * @param dataListener describe this
    */
   public void subscribeToJobModelVersionChange(IZkDataListener dataListener) {
@@ -192,13 +214,13 @@ public class ZkUtils {
     }
     zkClient.close();
 
-    if(debounceTimer != null)
+    if (debounceTimer != null)
       debounceTimer.stopScheduler();
   }
 
   public void deleteRoot() {
     String rootPath = keyBuilder.getRootPath();
-    if(rootPath != null && !rootPath.isEmpty() && zkClient.exists(rootPath)) {
+    if (rootPath != null && !rootPath.isEmpty() && zkClient.exists(rootPath)) {
       LOG.info("pid=" + processorId + " Deleteing root: " + rootPath);
       zkClient.deleteRecursive(rootPath);
     }
@@ -206,6 +228,7 @@ public class ZkUtils {
 
   class ZkStateChangeHandler implements IZkStateListener {
     private final ScheduleAfterDebounceTime debounceTimer;
+
     public ZkStateChangeHandler(ScheduleAfterDebounceTime debounceTimer) {
       this.debounceTimer = debounceTimer;
     }
@@ -217,7 +240,8 @@ public class ZkUtils {
      * @throws Exception On any error.
      */
     @Override
-    public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {    }
+    public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
+    }
 
     /**
      * Called after the zookeeper session has expired and a new session has been created. You would have to re-create