You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/12/24 01:04:04 UTC
[7/7] samza git commit: Fixing checkstyle errors
Fixing checkstyle errors
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4918e3ad
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4918e3ad
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4918e3ad
Branch: refs/heads/samza-standalone
Commit: 4918e3ad7c622a36f6c8d8dc8eb6c7778fdaac05
Parents: 0a2b63a
Author: navina <na...@apache.org>
Authored: Fri Dec 23 17:03:25 2016 -0800
Committer: navina <na...@apache.org>
Committed: Fri Dec 23 17:03:25 2016 -0800
----------------------------------------------------------------------
.../org/apache/samza/config/JavaJobConfig.java | 21 ++++++++-
.../samza/config/JobCoordinatorConfig.java | 23 ++++++++--
.../java/org/apache/samza/config/ZkConfig.java | 21 ++++++++-
.../task/SimpleGroupByContainerCount.java | 30 ++++++++++---
.../SimpleGroupByContainerCountFactory.java | 1 -
.../leaderelection/LeaderElector.java | 19 ++++++++
.../processor/SamzaContainerController.java | 41 ++++++++++++-----
.../samza/zk/BarrierForVersionUpgrade.java | 19 ++++++++
.../samza/zk/ScheduleAfterDebounceTime.java | 34 ++++++++++++---
.../samza/zk/ZkBarrierForVersionUpgrade.java | 39 +++++++++++++----
.../java/org/apache/samza/zk/ZkController.java | 27 +++++++++++-
.../org/apache/samza/zk/ZkControllerImpl.java | 36 ++++++++++++---
.../org/apache/samza/zk/ZkJobCoordinator.java | 46 +++++++++++++-------
.../samza/zk/ZkJobCoordinatorFactory.java | 19 ++++++++
.../java/org/apache/samza/zk/ZkKeyBuilder.java | 28 ++++++++++--
.../org/apache/samza/zk/ZkLeaderElector.java | 23 +++++++++-
.../java/org/apache/samza/zk/ZkListener.java | 19 ++++++++
.../main/java/org/apache/samza/zk/ZkUtils.java | 46 +++++++++++++++-----
18 files changed, 413 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java
index c0747f0..24b9427 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.samza.config;
public class JavaJobConfig extends MapConfig {
@@ -5,7 +24,7 @@ public class JavaJobConfig extends MapConfig {
private static final String JOB_ID = "job.id"; // streaming.job_id
private static final String DEFAULT_JOB_ID = "1";
- public JavaJobConfig (Config config) {
+ public JavaJobConfig(Config config) {
super(config);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
index c8e496e..4bb66de 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
@@ -1,14 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.samza.config;
import com.google.common.base.Strings;
-import org.apache.samza.coordinator.JobCoordinatorFactory;
-import org.apache.samza.util.Util;
public class JobCoordinatorConfig extends MapConfig {
// TODO: Change this to job-coordinator.factory
private static final String JOB_COORDINATOR_FACTORY = "job.coordinator.factory";
- public JobCoordinatorConfig (Config config) {
+ public JobCoordinatorConfig(Config config) {
super(config);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
index 973db42..31b1eda 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.samza.config;
public class ZkConfig extends MapConfig {
@@ -9,7 +28,7 @@ public class ZkConfig extends MapConfig {
public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 60000;
public static final int DEFAULT_SESSION_TIMEOUT_MS = 30000;
- public ZkConfig (Config config) {
+ public ZkConfig(Config config) {
super(config);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java
index 359c4ed..cf225c3 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java
@@ -1,5 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.samza.container.grouper.task;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskModel;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -7,10 +30,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.samza.container.LocalityManager;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.TaskModel;
public class SimpleGroupByContainerCount implements TaskNameGrouper {
@@ -18,6 +37,7 @@ public class SimpleGroupByContainerCount implements TaskNameGrouper {
public SimpleGroupByContainerCount() {
this.startContainerCount = 1;
}
+
public SimpleGroupByContainerCount(int containerCount) {
if (containerCount <= 0) throw new IllegalArgumentException("Must have at least one container");
this.startContainerCount = containerCount;
@@ -33,7 +53,7 @@ public class SimpleGroupByContainerCount implements TaskNameGrouper {
}
public Set<ContainerModel> group(Set<TaskModel> tasks, List<Integer> containersIds) {
- if(containersIds == null)
+ if (containersIds == null)
return this.group(tasks);
int containerCount = containersIds.size();
http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java
index 02918f6..b1c6b92 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java
@@ -20,7 +20,6 @@ package org.apache.samza.container.grouper.task;
import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java b/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java
index fc3cac9..2565ee2 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.samza.coordinator.leaderelection;
public interface LeaderElector {
http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
index fb227d0..d448d30 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.samza.processor;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -38,12 +57,12 @@ public class SamzaContainerController {
* Creates an instance of a controller for instantiating, starting and/or stopping {@link SamzaContainer}
* Requests to execute a container are submitted to the {@link ExecutorService}
*
- * @param taskFactory Factory that be used create instances of {@link org.apache.samza.task.StreamTask} or
- * {@link org.apache.samza.task.AsyncStreamTask}
+ * @param taskFactory Factory that be used create instances of {@link org.apache.samza.task.StreamTask} or
+ * {@link org.apache.samza.task.AsyncStreamTask}
* @param containerShutdownMs How long the Samza container should wait for an orderly shutdown of task instances
- * @param metricsReporterMap Map of metric reporter name and {@link MetricsReporter} instance
+ * @param metricsReporterMap Map of metric reporter name and {@link MetricsReporter} instance
*/
- public SamzaContainerController (
+ public SamzaContainerController(
Object taskFactory,
long containerShutdownMs,
String processorId,
@@ -62,14 +81,14 @@ public class SamzaContainerController {
/**
* Instantiates a container and submits to the executor. This method does not actually wait for the container to
* fully start-up. For such a behavior, see {@link #awaitStart(long)}
- *
+ * <p>
* <b>Note:</b> <i>This method does not stop a currently running container, if any. It is left up to the caller to
* ensure that the container has been stopped with stopContainer before invoking this method.</i>
*
- * @param containerModel {@link ContainerModel} instance to use for the current run of the Container
- * @param config Complete configuration map used by the Samza job
+ * @param containerModel {@link ContainerModel} instance to use for the current run of the Container
+ * @param config Complete configuration map used by the Samza job
* @param maxChangelogStreamPartitions Max number of partitions expected in the changelog streams
- * TODO: Try to get rid of maxChangelogStreamPartitions from method arguments
+ * TODO: Try to get rid of maxChangelogStreamPartitions from method arguments
*/
public void startContainer(ContainerModel containerModel, Config config, int maxChangelogStreamPartitions) {
LocalityManager localityManager = null;
@@ -96,7 +115,7 @@ public class SamzaContainerController {
*
* @param timeoutMs Maximum time to wait, in milliseconds
* @return {@code true}, if the container started within the specified wait time and {@code false} if the waiting
- * time elapsed
+ * time elapsed
* @throws InterruptedException if the current thread is interrupted while waiting for container to start-up
*/
public boolean awaitStart(long timeoutMs) throws InterruptedException {
@@ -107,14 +126,14 @@ public class SamzaContainerController {
* Stops a running container, if any. Invoking this method multiple times does not have any side-effects.
*/
public void stopContainer() {
- if(container == null) {
+ if (container == null) {
log.warn("Shutdown before a container was created.");
return;
}
container.shutdown();
try {
- if(containerFuture != null)
+ if (containerFuture != null)
containerFuture.get(containerShutdownMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
log.error("Ran into problems while trying to stop the container in the processor!", e);
http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
index 691aced..c2c4c23 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.samza.zk;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
index e217dab..d174938 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -1,18 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.samza.zk;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ScheduleAfterDebounceTime {
public static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
- public static final long timeoutMs = 1000*10;
+ public static final long TIMEOUT_MS = 1000 * 10;
public static final String JOB_MODEL_VERSION_CHANGE = "JobModelVersionChange";
public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange";
@@ -24,19 +45,18 @@ public class ScheduleAfterDebounceTime {
new ThreadFactoryBuilder().setNameFormat("zk-debounce-thread-%d").setDaemon(true).build());
private final Map<String, ScheduledFuture> futureHandles = new HashMap<>();
- public ScheduleAfterDebounceTime () {
-
+ public ScheduleAfterDebounceTime() {
}
- synchronized public void scheduleAfterDebounceTime (String actionName, long debounceTimeMs, Runnable runnable) {//, final ReadyToCreateJobModelListener listener) {
+ synchronized public void scheduleAfterDebounceTime(String actionName, long debounceTimeMs, Runnable runnable) { //, final ReadyToCreateJobModelListener listener) {
// check if this action has been scheduled already
ScheduledFuture sf = futureHandles.get(actionName);
- if(sf != null && !sf.isDone()) {
+ if (sf != null && !sf.isDone()) {
LOG.info(">>>>>>>>>>>DEBOUNCE: cancel future for " + actionName);
// attempt to cancel
- if(! sf.cancel(false) ) {
+ if (!sf.cancel(false)) {
try {
- sf.get(timeoutMs, TimeUnit.MILLISECONDS);
+ sf.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (Exception e) {
// we ignore the exception
LOG.warn("cancel for action " + actionName + " failed with ", e);
http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
index 60a06da..524afed 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -1,6 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.samza.zk;
import java.util.List;
+
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.slf4j.Logger;
@@ -17,7 +37,7 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
final private String barrierPrefix;
- public ZkBarrierForVersionUpgrade( ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer) {
+ public ZkBarrierForVersionUpgrade(ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer) {
this.zkUtils = zkUtils;
keyBuilder = zkUtils.getKeyBuilder();
@@ -83,7 +103,7 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
// Find out the event & Log
boolean allIn = true;
- if(currentChildren == null) {
+ if (currentChildren == null) {
LOG.info("Got handleChildChange with null currentChildren");
return;
}
@@ -101,33 +121,34 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
// check if all the names are in
- for(String n : names) {
- if(!currentChildren.contains(n)) {
+ for (String n : names) {
+ if (!currentChildren.contains(n)) {
LOG.info("node " + n + " is still not in the list ");
allIn = false;
break;
}
}
- if(allIn) {
+ if (allIn) {
LOG.info("ALl nodes reached the barrier");
callback.run(); // all the names have registered
}
}
}
- class ZkBarrierReachedHandler implements IZkDataListener {
+ class ZkBarrierReachedHandler implements IZkDataListener {
private final ScheduleAfterDebounceTime debounceTimer;
private final String barrierPathDone;
private final Runnable callback;
+
public ZkBarrierReachedHandler(String barrierPathDone, ScheduleAfterDebounceTime debounceTimer, Runnable callback) {
this.barrierPathDone = barrierPathDone;
- this.callback = callback;
+ this.callback = callback;
this.debounceTimer = debounceTimer;
}
@Override
public void handleDataChange(String dataPath, Object data)
- throws Exception {
+ throws Exception {
String done = (String) data;
LOG.info("got notification about barrier path=" + barrierPathDone + "; done=" + done);
if (done.equals(BARRIER_DONE)) {
@@ -140,7 +161,7 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
@Override
public void handleDataDeleted(String dataPath)
- throws Exception {
+ throws Exception {
LOG.warn("barrier done got deleted at " + dataPath);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkController.java b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
index 20e55ab..76ff8d2 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
@@ -1,11 +1,34 @@
-package org.apache.samza.zk;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
public interface ZkController {
- void register ();
+ void register();
+
boolean isLeader();
+
void notifyJobModelChange(String version);
+
void stop();
+
void listenToProcessorLiveness();
+
String currentJobModelVersion();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
index fdd1f02..b00a3ee 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.samza.zk;
import org.I0Itec.zkclient.IZkChildListener;
@@ -18,7 +37,7 @@ public class ZkControllerImpl implements ZkController {
private final ZkLeaderElector leaderElector;
private final ScheduleAfterDebounceTime debounceTimer;
- public ZkControllerImpl (String processorIdStr, ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer, ZkListener zkListener) {
+ public ZkControllerImpl(String processorIdStr, ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer, ZkListener zkListener) {
this.processorIdStr = processorIdStr;
this.zkUtils = zkUtils;
this.zkListener = zkListener;
@@ -30,11 +49,10 @@ public class ZkControllerImpl implements ZkController {
@Override
public void register() {
-
// TODO - make a loop here with some number of attempts.
// possibly split into two method - becomeLeader() and becomeParticipant()
boolean isLeader = leaderElector.tryBecomeLeader();
- if(isLeader) {
+ if (isLeader) {
listenToProcessorLiveness();
// zkUtils.subscribeToProcessorChange(zkProcessorChangeListener);
@@ -49,7 +67,7 @@ public class ZkControllerImpl implements ZkController {
private void init() {
ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
- zkUtils.makeSurePersistentPathsExists(new String[] {
+ zkUtils.makeSurePersistentPathsExists(new String[]{
keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder.getJobModelPathPrefix()});
}
@@ -82,11 +100,13 @@ public class ZkControllerImpl implements ZkController {
}
// Only by Leader
- class ZkProcessorChangeHandler implements IZkChildListener {
+ class ZkProcessorChangeHandler implements IZkChildListener {
private final ScheduleAfterDebounceTime debounceTimer;
+
public ZkProcessorChangeHandler(ScheduleAfterDebounceTime debounceTimer) {
this.debounceTimer = debounceTimer;
}
+
/**
* Called when the children of the given path changed.
*
@@ -106,11 +126,14 @@ public class ZkControllerImpl implements ZkController {
class ZkJobModelVersionChangeHandler implements IZkDataListener {
private final ScheduleAfterDebounceTime debounceTimer;
+
public ZkJobModelVersionChangeHandler(ScheduleAfterDebounceTime debounceTimer) {
this.debounceTimer = debounceTimer;
}
+
/**
* called when job model version gets updated
+ *
* @param dataPath
* @param data
* @throws Exception
@@ -123,6 +146,7 @@ public class ZkControllerImpl implements ZkController {
debounceTimer
.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () -> notifyJobModelChange((String) data));
}
+
@Override
public void handleDataDeleted(String dataPath) throws Exception {
throw new SamzaException("version update path has been deleted!.");
@@ -130,7 +154,7 @@ public class ZkControllerImpl implements ZkController {
}
public void shutdown() {
- if(debounceTimer != null)
+ if (debounceTimer != null)
debounceTimer.stopScheduler();
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index f661547..e83b16b 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -1,17 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.samza.zk;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaSystemConfig;
-import org.apache.samza.config.MapConfig;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.JobModelManager$;
-import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.processor.SamzaContainerController;
import org.apache.samza.system.StreamMetadataCache;
@@ -22,7 +35,11 @@ import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* JobCoordinator for stand alone processor managed via Zookeeper.
@@ -60,12 +77,11 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
barrier = new ZkBarrierForVersionUpgrade(zkUtils, debounceTimer); //should not have any state in it
-
// TEMP for model generation
//////////////////////////////// NEEDS TO BE REPLACED //////////////////////////////////////
JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
Map<String, SystemAdmin> systemAdmins = new HashMap<>();
- for (String systemName: systemConfig.getSystemNames()) {
+ for (String systemName : systemConfig.getSystemNames()) {
String systemFactoryClassName = systemConfig.getSystemFactory(systemName);
if (systemFactoryClassName == null) {
log.error(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
@@ -75,11 +91,10 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config));
}
- streamMetadataCache = new StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock
+ streamMetadataCache = new StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock
.instance());
-
////////////////////////////////////////////////////////////////////////////////////////////
}
@@ -99,8 +114,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
}
@Override
- public int getProcessorId()
- {
+ public int getProcessorId() {
return processorId;
}
@@ -125,9 +139,9 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
List<String> currentProcessors = zkUtils.getActiveProcessors();
// get the current version
- String currentJMVersion = zkUtils.getJobModelVersion();
+ String currentJMVersion = zkUtils.getJobModelVersion();
String nextJMVersion;
- if(currentJMVersion == null)
+ if (currentJMVersion == null)
nextJMVersion = "1";
else
nextJMVersion = Integer.toString(Integer.valueOf(currentJMVersion) + 1);
@@ -139,7 +153,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
//JobModel jobModel = new JobModel(config, containers);
StringBuilder sb = new StringBuilder();
List<Integer> containerIds = new ArrayList<>();
- for(String processor: currentProcessors){
+ for (String processor : currentProcessors) {
String zkProcessorId = keyBuilder.parseContainerIdFromProcessorId(processor);
sb.append(zkProcessorId).append(",");
containerIds.add(Integer.valueOf(zkProcessorId));
@@ -163,7 +177,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion);
}
- //////////////////////////////////////////////////////////////////////////////////////////////
+ //////////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onProcessorChange(List<String> processorIds) {
// Reset debounce Timer
http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index 90b0097..02db340 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.samza.zk;
import org.apache.samza.config.Config;
http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
index 42d0c86..efe3349 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.samza.zk;
import org.apache.samza.SamzaException;
@@ -10,10 +29,11 @@ public class ZkKeyBuilder {
public static final String JOBMODEL_VERSION_PATH = "jobModelVersion";
- public ZkKeyBuilder () {
+ public ZkKeyBuilder() {
this("");
}
- public ZkKeyBuilder (String pathPrefix) {
+
+ public ZkKeyBuilder(String pathPrefix) {
this.pathPrefix = pathPrefix;
}
@@ -23,12 +43,12 @@ public class ZkKeyBuilder {
public static String parseIdFromPath(String path) {
if (path != null)
- return path.substring(path.indexOf(PROCESSOR_ID_PREFIX));
+ return path.substring(path.indexOf(PROCESSOR_ID_PREFIX));
return null;
}
public static String parseContainerIdFromProcessorId(String prId) {
- if(prId == null)
+ if (prId == null)
throw new SamzaException("processor id is null");
return prId.substring(prId.indexOf(PROCESSOR_ID_PREFIX) + PROCESSOR_ID_PREFIX.length());
http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
index 8bffeb6..ecc0f0b 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
@@ -1,12 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.samza.zk;
-import java.util.Arrays;
import org.I0Itec.zkclient.IZkDataListener;
import org.apache.samza.SamzaException;
import org.apache.samza.coordinator.leaderelection.LeaderElector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.List;
import java.util.Random;
@@ -23,7 +42,7 @@ public class ZkLeaderElector implements LeaderElector {
private String currentSubscription = null;
private final Random random = new Random();
- public ZkLeaderElector (String processorIdStr, ZkUtils zkUtils, ZkListener zkListener) {
+ public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils, ZkListener zkListener) {
this.processorIdStr = processorIdStr;
this.zkUtils = zkUtils;
this.keyBuilder = this.zkUtils.getKeyBuilder();
http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java b/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java
index 4a1c491..a0c69c6 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.samza.zk;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index de8c213..05100a5 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -1,6 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.samza.zk;
-import java.io.IOException;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
@@ -17,6 +35,7 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
@@ -75,8 +94,9 @@ public class ZkUtils {
public ZkKeyBuilder getKeyBuilder() {
return keyBuilder;
}
+
public void makeSurePersistentPathsExists(String[] paths) {
- for(String path: paths) {
+ for (String path : paths) {
if (!zkClient.exists(path)) {
zkClient.createPersistent(path, true);
}
@@ -119,13 +139,14 @@ public class ZkUtils {
throw new SamzaException(e);
}
}
+
public JobModel getJobModel(String jobModelVersion) {
LOG.info("pid=" + processorId + "read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion));
Object data = zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion));
ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
JobModel jm;
try {
- jm = mmapper.readValue((String)data, JobModel.class);
+ jm = mmapper.readValue((String) data, JobModel.class);
} catch (IOException e) {
throw new SamzaException("failed to read JobModel from ZK", e);
}
@@ -140,23 +161,24 @@ public class ZkUtils {
public void publishNewJobModelVersion(String oldVersion, String newVersion) {
Stat stat = new Stat();
- String currentVersion = zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat);
+ String currentVersion = zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat);
LOG.info("pid=" + processorId + " publishing new version: " + newVersion + "; oldVersion = " + oldVersion + "(" + stat.getVersion() + ")");
- if(currentVersion != null && !currentVersion.equals(oldVersion)) {
- throw new SamzaException("Someone change JMVersion while Leader was generating: expected" + oldVersion + ", got " + currentVersion);
+ if (currentVersion != null && !currentVersion.equals(oldVersion)) {
+ throw new SamzaException("Someone change JMVersion while Leader was generating: expected" + oldVersion + ", got " + currentVersion);
}
int dataVersion = stat.getVersion();
stat = zkClient.writeDataReturnStat(keyBuilder.getJobModelVersionPath(), newVersion, dataVersion);
- if(stat.getVersion() != dataVersion + 1)
+ if (stat.getVersion() != dataVersion + 1)
throw new SamzaException("Someone changed data version of the JMVersion while Leader was generating a new one. current= " + dataVersion + ", old version = " + stat.getVersion());
LOG.info("pid=" + processorId +
" published new version: " + newVersion + "; expected dataVersion = " + dataVersion + "(" + stat.getVersion()
- + ")");
+ + ")");
}
/**
* subscribe for changes of JobModel version
+ *
* @param dataListener describe this
*/
public void subscribeToJobModelVersionChange(IZkDataListener dataListener) {
@@ -192,13 +214,13 @@ public class ZkUtils {
}
zkClient.close();
- if(debounceTimer != null)
+ if (debounceTimer != null)
debounceTimer.stopScheduler();
}
public void deleteRoot() {
String rootPath = keyBuilder.getRootPath();
- if(rootPath != null && !rootPath.isEmpty() && zkClient.exists(rootPath)) {
+ if (rootPath != null && !rootPath.isEmpty() && zkClient.exists(rootPath)) {
LOG.info("pid=" + processorId + " Deleteing root: " + rootPath);
zkClient.deleteRecursive(rootPath);
}
@@ -206,6 +228,7 @@ public class ZkUtils {
class ZkStateChangeHandler implements IZkStateListener {
private final ScheduleAfterDebounceTime debounceTimer;
+
public ZkStateChangeHandler(ScheduleAfterDebounceTime debounceTimer) {
this.debounceTimer = debounceTimer;
}
@@ -217,7 +240,8 @@ public class ZkUtils {
* @throws Exception On any error.
*/
@Override
- public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception { }
+ public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
+ }
/**
* Called after the zookeeper session has expired and a new session has been created. You would have to re-create