You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/03/29 17:44:59 UTC
[1/2] samza git commit: SAMZA-1151 - Coordination Service
Repository: samza
Updated Branches:
refs/heads/master 61cf4e4df -> 553ce33b1
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index b48bc70..fb31054 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -19,7 +19,9 @@
package org.apache.samza.zk;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -27,6 +29,12 @@ import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.CoordinationServiceFactory;
+import org.apache.samza.coordinator.LeaderElectorListener;
import org.apache.samza.testUtils.EmbeddedZookeeper;
import org.junit.After;
import org.junit.AfterClass;
@@ -34,12 +42,15 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestZkLeaderElector {
+ private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(TestZkLeaderElector.class);
private static EmbeddedZookeeper zkServer = null;
private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
@@ -47,6 +58,7 @@ public class TestZkLeaderElector {
private ZkUtils testZkUtils = null;
private static final int SESSION_TIMEOUT_MS = 20000;
private static final int CONNECTION_TIMEOUT_MS = 10000;
+ private final CoordinationServiceFactory factory = new ZkCoordinationServiceFactory();
@BeforeClass
public static void setup() throws InterruptedException {
@@ -58,7 +70,7 @@ public class TestZkLeaderElector {
public void testSetup() {
testZkConnectionString = "127.0.0.1:" + zkServer.getPort();
try {
- testZkUtils = getZkUtilsWithNewClient();
+ testZkUtils = getZkUtilsWithNewClient("testProcessorId");
} catch (Exception e) {
Assert.fail("Client connection setup failed. Aborting tests..");
}
@@ -96,18 +108,22 @@ public class TestZkLeaderElector {
when(mockZkUtils.registerProcessorAndGetId(any())).
thenReturn(KEY_BUILDER.getProcessorsPath() + "/0000000000");
when(mockZkUtils.getSortedActiveProcessors()).thenReturn(activeProcessors);
+ Mockito.doNothing().when(mockZkUtils).makeSurePersistentPathsExists(any(String[].class));
+ ZkKeyBuilder kb = mock(ZkKeyBuilder.class);
+ when(kb.getProcessorsPath()).thenReturn("");
+ when(mockZkUtils.getKeyBuilder()).thenReturn(kb);
+
+ ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils, null);
BooleanResult isLeader = new BooleanResult();
- ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils,
- new ZkLeaderElector.ZkLeaderElectorListener() {
- @Override
- public void onBecomingLeader() {
- isLeader.res = true;
- }
+
+ leaderElector.tryBecomeLeader(new LeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader.res = true;
}
- );
- leaderElector.tryBecomeLeader();
- Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader.res, 2, 100));
+ });
+ Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader.res, 2, 100));
}
@Test
@@ -115,22 +131,37 @@ public class TestZkLeaderElector {
String processorId = "1";
ZkUtils mockZkUtils = mock(ZkUtils.class);
when(mockZkUtils.getSortedActiveProcessors()).thenReturn(new ArrayList<String>());
+ Mockito.doNothing().when(mockZkUtils).makeSurePersistentPathsExists(any(String[].class));
+
+ ZkKeyBuilder kb = mock(ZkKeyBuilder.class);
+ when(kb.getProcessorsPath()).thenReturn("");
+ when(mockZkUtils.getKeyBuilder()).thenReturn(kb);
+
+ ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, mockZkUtils, null);
- ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, mockZkUtils,
- new ZkLeaderElector.ZkLeaderElectorListener() {
- @Override
- public void onBecomingLeader() {
- }
- }
- );
try {
- leaderElector.tryBecomeLeader();
+ leaderElector.tryBecomeLeader(new LeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ }
+ });
Assert.fail("Was expecting leader election to fail!");
} catch (SamzaException e) {
// No-op Expected
}
}
+ private CoordinationUtils getZkCoordinationService(String groupId, String processorId) {
+
+ Map<String, String> map = new HashMap<>();
+ map.put(ZkConfig.ZK_CONNECT, testZkConnectionString);
+ Config config = new MapConfig(map);
+
+ CoordinationUtils coordinationUtils = factory.getCoordinationService(groupId, processorId, config);
+
+ return coordinationUtils;
+ }
+
/**
* Test starts 3 processors and verifies the state of the Zk tree after all processors participate in LeaderElection
*/
@@ -139,50 +170,49 @@ public class TestZkLeaderElector {
BooleanResult isLeader1 = new BooleanResult();
BooleanResult isLeader2 = new BooleanResult();
BooleanResult isLeader3 = new BooleanResult();
+
+
// Processor-1
- ZkUtils zkUtils1 = getZkUtilsWithNewClient();
- ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1,
- new ZkLeaderElector.ZkLeaderElectorListener() {
- @Override
- public void onBecomingLeader() {
- isLeader1.res = true;
- }
- }
- );
+ ZkUtils zkUtils1 = getZkUtilsWithNewClient("1");
+ ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1, null);
// Processor-2
- ZkUtils zkUtils2 = getZkUtilsWithNewClient();
- ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2,
- new ZkLeaderElector.ZkLeaderElectorListener() {
- @Override
- public void onBecomingLeader() {
- isLeader2.res = true;
- }
- }
- );
+ ZkUtils zkUtils2 = getZkUtilsWithNewClient("2");
+ ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2, null);
+
// Processor-3
- ZkUtils zkUtils3 = getZkUtilsWithNewClient();
- ZkLeaderElector leaderElector3 = new ZkLeaderElector("3", zkUtils3,
- new ZkLeaderElector.ZkLeaderElectorListener() {
- @Override
- public void onBecomingLeader() {
- isLeader3.res = true;
- }
- });
+ ZkUtils zkUtils3 = getZkUtilsWithNewClient("3");
+ ZkLeaderElector leaderElector3 = new ZkLeaderElector("3", zkUtils3, null);
Assert.assertEquals(0, testZkUtils.getSortedActiveProcessors().size());
- leaderElector1.tryBecomeLeader();
- leaderElector2.tryBecomeLeader();
- leaderElector3.tryBecomeLeader();
+ leaderElector1.tryBecomeLeader(new LeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader1.res = true;
+ }
+ });
+ leaderElector2.tryBecomeLeader(new LeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader2.res = true;
+ }
+ });
+ leaderElector3.tryBecomeLeader(new LeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader3.res = true;
+ }
+ });
- Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 100));
- Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 100));
- Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 100));
+ Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader1.res, 2, 100));
+ Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 2, 100));
+ Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 2, 100));
Assert.assertEquals(3, testZkUtils.getSortedActiveProcessors().size());
+
// Clean up
zkUtils1.close();
zkUtils2.close();
@@ -211,104 +241,102 @@ public class TestZkLeaderElector {
// Processor-1
- ZkUtils zkUtils1 = getZkUtilsWithNewClient();
+ ZkUtils zkUtils1 = getZkUtilsWithNewClient("processor1");
zkUtils1.registerProcessorAndGetId("processor1");
- ZkLeaderElector leaderElector1 = new ZkLeaderElector(
- "1",
- zkUtils1,
- new ZkLeaderElector.ZkLeaderElectorListener() {
- @Override
- public void onBecomingLeader() {
- isLeader1.res = true;
- }
- },
- new IZkDataListener() {
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception {
- }
-
- @Override
- public void handleDataDeleted(String dataPath) throws Exception {
- count.incrementAndGet();
- }
- });
+ ZkLeaderElector leaderElector1 = new ZkLeaderElector("processor1", zkUtils1, null);
+ leaderElector1.setPreviousProcessorChangeListener(new IZkDataListener() {
+ @Override
+ public void handleDataChange(String dataPath, Object data)
+ throws Exception {
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath)
+ throws Exception {
+ count.incrementAndGet();
+ }
+ });
// Processor-2
- ZkUtils zkUtils2 = getZkUtilsWithNewClient();
+ ZkUtils zkUtils2 = getZkUtilsWithNewClient("processor2");
final String path2 = zkUtils2.registerProcessorAndGetId("processor2");
- ZkLeaderElector leaderElector2 = new ZkLeaderElector(
- "2",
- zkUtils2,
- new ZkLeaderElector.ZkLeaderElectorListener() {
- @Override
- public void onBecomingLeader() {
- isLeader2.res = true;
- }
- },
- new IZkDataListener() {
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception {
-
- }
-
- @Override
- public void handleDataDeleted(String dataPath) throws Exception {
- String registeredIdStr = ZkKeyBuilder.parseIdFromPath(path2);
- Assert.assertNotNull(registeredIdStr);
-
- String predecessorIdStr = ZkKeyBuilder.parseIdFromPath(dataPath);
- Assert.assertNotNull(predecessorIdStr);
-
- try {
- int selfId = Integer.parseInt(registeredIdStr);
- int predecessorId = Integer.parseInt(predecessorIdStr);
- Assert.assertEquals(1, selfId - predecessorId);
- } catch (Exception e) {
- System.out.println(e.getMessage());
- }
- count.incrementAndGet();
- electionLatch.countDown();
- }
- });
+ ZkLeaderElector leaderElector2 = new ZkLeaderElector("processor2", zkUtils2, null);
+
+ leaderElector2.setPreviousProcessorChangeListener(new IZkDataListener() {
+ @Override
+ public void handleDataChange(String dataPath, Object data)
+ throws Exception {
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath)
+ throws Exception {
+ String registeredIdStr = ZkKeyBuilder.parseIdFromPath(path2);
+ Assert.assertNotNull(registeredIdStr);
+
+ String predecessorIdStr = ZkKeyBuilder.parseIdFromPath(dataPath);
+ Assert.assertNotNull(predecessorIdStr);
+
+ try {
+ int selfId = Integer.parseInt(registeredIdStr);
+ int predecessorId = Integer.parseInt(predecessorIdStr);
+ Assert.assertEquals(1, selfId - predecessorId);
+ } catch (Exception e) {
+ LOG.error(e.getLocalizedMessage());
+ }
+ count.incrementAndGet();
+ electionLatch.countDown();
+ }
+ });
// Processor-3
- ZkUtils zkUtils3 = getZkUtilsWithNewClient();
+ ZkUtils zkUtils3 = getZkUtilsWithNewClient("processor3");
zkUtils3.registerProcessorAndGetId("processor3");
- ZkLeaderElector leaderElector3 = new ZkLeaderElector(
- "3",
- zkUtils3,
- new ZkLeaderElector.ZkLeaderElectorListener() {
- @Override
- public void onBecomingLeader() {
- isLeader3.res = true;
- }
- },
- new IZkDataListener() {
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception {
-
- }
-
- @Override
- public void handleDataDeleted(String dataPath) throws Exception {
- count.incrementAndGet();
- }
- });
+ ZkLeaderElector leaderElector3 = new ZkLeaderElector("processor3", zkUtils3, null);
+
+ leaderElector3.setPreviousProcessorChangeListener(new IZkDataListener() {
+ @Override
+ public void handleDataChange(String dataPath, Object data)
+ throws Exception {
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath)
+ throws Exception {
+ count.incrementAndGet();
+ }
+ });
// Join Leader Election
- leaderElector1.tryBecomeLeader();
- leaderElector2.tryBecomeLeader();
- leaderElector3.tryBecomeLeader();
- Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 100));
- Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 100));
- Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 100));
+ leaderElector1.tryBecomeLeader(new LeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader1.res = true;
+ }
+ });
+ leaderElector2.tryBecomeLeader(new LeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader2.res = true;
+ }
+ });
+ leaderElector3.tryBecomeLeader(new LeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader3.res = true;
+ }
+ });
+
+ Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader1.res, 2, 100));
+ Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 2, 100));
+ Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 2, 100));
Assert.assertTrue(leaderElector1.amILeader());
Assert.assertFalse(leaderElector2.amILeader());
Assert.assertFalse(leaderElector3.amILeader());
- List<String> currentActiveProcessors = testZkUtils.getSortedActiveProcessors();
+ List<String> currentActiveProcessors = zkUtils1.getSortedActiveProcessors();
Assert.assertEquals(3, currentActiveProcessors.size());
// Leader Failure
@@ -322,11 +350,12 @@ public class TestZkLeaderElector {
}
Assert.assertEquals(1, count.get());
- Assert.assertEquals(currentActiveProcessors, testZkUtils.getSortedActiveProcessors());
+ Assert.assertEquals(currentActiveProcessors, zkUtils2.getSortedActiveProcessors());
// Clean up
zkUtils2.close();
zkUtils3.close();
+
}
/**
@@ -347,100 +376,101 @@ public class TestZkLeaderElector {
BooleanResult isLeader3 = new BooleanResult();
// Processor-1
- ZkUtils zkUtils1 = getZkUtilsWithNewClient();
+ ZkUtils zkUtils1 = getZkUtilsWithNewClient("processor1");
zkUtils1.registerProcessorAndGetId("processor1");
- ZkLeaderElector leaderElector1 = new ZkLeaderElector(
- "1",
- zkUtils1,
- new ZkLeaderElector.ZkLeaderElectorListener() {
- @Override
- public void onBecomingLeader() {
- isLeader1.res = true;
- }
- },
- new IZkDataListener() {
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception {
-
- }
-
- @Override
- public void handleDataDeleted(String dataPath) throws Exception {
- count.incrementAndGet();
- }
- });
+ ZkLeaderElector leaderElector1 = new ZkLeaderElector("processor1", zkUtils1, null);
+
+ leaderElector1.setPreviousProcessorChangeListener(new IZkDataListener() {
+ @Override
+ public void handleDataChange(String dataPath, Object data)
+ throws Exception {
+
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath)
+ throws Exception {
+ count.incrementAndGet();
+ }
+ });
+
// Processor-2
- ZkUtils zkUtils2 = getZkUtilsWithNewClient();
+ ZkUtils zkUtils2 = getZkUtilsWithNewClient("processor2");
zkUtils2.registerProcessorAndGetId("processor2");
- ZkLeaderElector leaderElector2 = new ZkLeaderElector(
- "2",
- zkUtils2,
- new ZkLeaderElector.ZkLeaderElectorListener() {
- @Override
- public void onBecomingLeader() {
- isLeader2.res = true;
- }
- },
- new IZkDataListener() {
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception {
-
- }
-
- @Override
- public void handleDataDeleted(String dataPath) throws Exception {
- count.incrementAndGet();
- }
- });
+ ZkLeaderElector leaderElector2 = new ZkLeaderElector("processor2", zkUtils2, null);
+
+ leaderElector2.setPreviousProcessorChangeListener(new IZkDataListener() {
+ @Override
+ public void handleDataChange(String dataPath, Object data)
+ throws Exception {
+
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath)
+ throws Exception {
+ count.incrementAndGet();
+ }
+ });
// Processor-3
- ZkUtils zkUtils3 = getZkUtilsWithNewClient();
+ ZkUtils zkUtils3 = getZkUtilsWithNewClient("processor3");
final String path3 = zkUtils3.registerProcessorAndGetId("processor3");
- ZkLeaderElector leaderElector3 = new ZkLeaderElector(
- "3",
- zkUtils3,
- new ZkLeaderElector.ZkLeaderElectorListener() {
- @Override
- public void onBecomingLeader() {
- isLeader3.res = true;
- }
- },
- new IZkDataListener() {
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception {
-
- }
-
- @Override
- public void handleDataDeleted(String dataPath) throws Exception {
- String registeredIdStr = ZkKeyBuilder.parseIdFromPath(path3);
- Assert.assertNotNull(registeredIdStr);
-
- String predecessorIdStr = ZkKeyBuilder.parseIdFromPath(dataPath);
- Assert.assertNotNull(predecessorIdStr);
-
- try {
- int selfId = Integer.parseInt(registeredIdStr);
- int predecessorId = Integer.parseInt(predecessorIdStr);
- Assert.assertEquals(1, selfId - predecessorId);
- } catch (Exception e) {
- Assert.fail("Exception in LeaderElectionListener!");
- }
- count.incrementAndGet();
- electionLatch.countDown();
- }
- });
+ ZkLeaderElector leaderElector3 = new ZkLeaderElector("processor3", zkUtils3, null);
+
+ leaderElector3.setPreviousProcessorChangeListener(new IZkDataListener() {
+ @Override
+ public void handleDataChange(String dataPath, Object data)
+ throws Exception {
+
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath)
+ throws Exception {
+ String registeredIdStr = ZkKeyBuilder.parseIdFromPath(path3);
+ Assert.assertNotNull(registeredIdStr);
+
+ String predecessorIdStr = ZkKeyBuilder.parseIdFromPath(dataPath);
+ Assert.assertNotNull(predecessorIdStr);
+
+ try {
+ int selfId = Integer.parseInt(registeredIdStr);
+ int predecessorId = Integer.parseInt(predecessorIdStr);
+ Assert.assertEquals(1, selfId - predecessorId);
+ } catch (Exception e) {
+ Assert.fail("Exception in LeaderElectionListener!");
+ }
+ count.incrementAndGet();
+ electionLatch.countDown();
+ }
+ });
// Join Leader Election
- leaderElector1.tryBecomeLeader();
- leaderElector2.tryBecomeLeader();
- leaderElector3.tryBecomeLeader();
- Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 100));
- Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 100));
- Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 100));
-
- List<String> currentActiveProcessors = testZkUtils.getSortedActiveProcessors();
+ leaderElector1.tryBecomeLeader(new LeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader1.res = true;
+ }
+ });
+ leaderElector2.tryBecomeLeader(new LeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader2.res = true;
+ }
+ });
+ leaderElector3.tryBecomeLeader(new LeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader3.res = true;
+ }
+ });
+ Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader1.res, 2, 100));
+ Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 2, 100));
+ Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 2, 100));
+
+ List<String> currentActiveProcessors = zkUtils1.getSortedActiveProcessors();
Assert.assertEquals(3, currentActiveProcessors.size());
zkUtils2.close();
@@ -453,7 +483,7 @@ public class TestZkLeaderElector {
}
Assert.assertEquals(1, count.get());
- Assert.assertEquals(currentActiveProcessors, testZkUtils.getSortedActiveProcessors());
+ Assert.assertEquals(currentActiveProcessors, zkUtils1.getSortedActiveProcessors());
// Clean up
zkUtils1.close();
@@ -465,43 +495,43 @@ public class TestZkLeaderElector {
BooleanResult isLeader1 = new BooleanResult();
BooleanResult isLeader2 = new BooleanResult();
// Processor-1
- ZkLeaderElector leaderElector1 = new ZkLeaderElector(
- "1",
- getZkUtilsWithNewClient(),
- new ZkLeaderElector.ZkLeaderElectorListener() {
- @Override
- public void onBecomingLeader() {
- isLeader1.res = true;
- }
- });
+
+ ZkUtils zkUtils1 = getZkUtilsWithNewClient("1");
+ ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1, null);
// Processor-2
- ZkLeaderElector leaderElector2 = new ZkLeaderElector(
- "2",
- getZkUtilsWithNewClient(),
- new ZkLeaderElector.ZkLeaderElectorListener() {
- @Override
- public void onBecomingLeader() {
- isLeader2.res = true;
- }
- });
+ ZkUtils zkUtils2 = getZkUtilsWithNewClient("2");
+ ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2, null);
// Before Leader Election
Assert.assertFalse(leaderElector1.amILeader());
Assert.assertFalse(leaderElector2.amILeader());
- leaderElector1.tryBecomeLeader();
- leaderElector2.tryBecomeLeader();
+ leaderElector1.tryBecomeLeader(new LeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader1.res = true;
+ }
+ });
+ leaderElector2.tryBecomeLeader(new LeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ isLeader2.res = true;
+ }
+ });
// After Leader Election
Assert.assertTrue(leaderElector1.amILeader());
Assert.assertFalse(leaderElector2.amILeader());
+
+ zkUtils1.close();
+ zkUtils2.close();
}
- private ZkUtils getZkUtilsWithNewClient() {
+ private ZkUtils getZkUtilsWithNewClient(String processorId) {
ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
return new ZkUtils(
- "processorId1",
+ processorId,
KEY_BUILDER,
ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS),
CONNECTION_TIMEOUT_MS);
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
new file mode 100644
index 0000000..ec7e830
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.CoordinationServiceFactory;
+import org.apache.samza.coordinator.Latch;
+import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+
+public class TestZkProcessorLatch {
+ private static EmbeddedZookeeper zkServer = null;
+ private static String zkConnectionString;
+ private final CoordinationServiceFactory factory = new ZkCoordinationServiceFactory();
+ private CoordinationUtils coordinationUtils;
+
+ @BeforeClass
+ public static void setup() throws InterruptedException {
+ zkServer = new EmbeddedZookeeper();
+ zkServer.setup();
+
+ zkConnectionString = "localhost:" + zkServer.getPort();
+ System.out.println("ZK port = " + zkServer.getPort());
+ }
+
+ @Before
+ public void testSetup() {
+ String groupId = "group1";
+ String processorId = "p1";
+ Map<String, String> map = new HashMap<>();
+ map.put(ZkConfig.ZK_CONNECT, zkConnectionString);
+ Config config = new MapConfig(map);
+
+
+ coordinationUtils = factory.getCoordinationService(groupId, processorId, config);
+ coordinationUtils.reset();
+ }
+
+ @After
+ public void testTearDown() {
+ }
+
+ @AfterClass
+ public static void teardown() {
+ zkServer.teardown();
+ }
+
+ @Test
+ public void testSingleLatch1() {
+ System.out.println("Started 1");
+ int latchSize = 1;
+ String latchId = "l2";
+ ExecutorService pool = Executors.newFixedThreadPool(3);
+ Future f1 = pool.submit(
+ () -> {
+ Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+ latch.countDown();
+ try {
+ latch.await(100000, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ Assert.fail("await timed out. " + e.getLocalizedMessage());
+ }
+ });
+
+ try {
+ f1.get(30000, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ Assert.fail("failed to get future." + e.getLocalizedMessage());
+ }
+ pool.shutdownNow();
+ }
+
+ @Test
+ public void testSingleLatch2() {
+ System.out.println("Started 1");
+ int latchSize = 1;
+ String latchId = "l2";
+
+ ExecutorService pool = Executors.newFixedThreadPool(3);
+ Future f1 = pool.submit(
+ () -> {
+ Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+ //latch.countDown(); only one thread counts down
+ try {
+ latch.await(100000, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ Assert.fail("await timed out. " + e.getLocalizedMessage());
+ }
+ });
+
+ Future f2 = pool.submit(
+ () -> {
+ Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+ latch.countDown();
+ try {
+ latch.await(100000, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ Assert.fail("await timed out. " + e.getLocalizedMessage());
+ }
+ });
+
+ try {
+ f1.get(30000, TimeUnit.MILLISECONDS);
+ f2.get(30000, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ Assert.fail("failed to get future." + e.getLocalizedMessage());
+ }
+ pool.shutdownNow();
+ }
+
+ @Test
+ public void testNSizeLatch() {
+ System.out.println("Started N");
+ String latchId = "l1";
+ int latchSize = 3;
+
+ ExecutorService pool = Executors.newFixedThreadPool(3);
+ Future f1 = pool.submit(
+ () -> {
+ Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+ latch.countDown();
+ try {
+ latch.await(100000, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ Assert.fail("await timed out " + e.getLocalizedMessage());
+ }
+ });
+ Future f2 = pool.submit(
+ () -> {
+ Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+ latch.countDown();
+ try {
+ latch.await(100000, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ Assert.fail("await timed out. " + e.getLocalizedMessage());
+ }
+ });
+ Future f3 = pool.submit(
+ () -> {
+ Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+ latch.countDown();
+ try {
+ latch.await(100000, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ Assert.fail("await timed out. " + e.getLocalizedMessage());
+ }
+ });
+
+ try {
+ f1.get(300, TimeUnit.MILLISECONDS);
+ f2.get(300, TimeUnit.MILLISECONDS);
+ f3.get(300, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ Assert.fail("failed to get future. " + e.getLocalizedMessage());
+ }
+ }
+
+ @Test
+ public void testLatchExpires() {
+ System.out.println("Started expiring");
+ String latchId = "l4";
+
+ int latchSize = 3;
+
+ ExecutorService pool = Executors.newFixedThreadPool(3);
+ Future f1 = pool.submit(
+ () -> {
+ Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+ latch.countDown();
+ try {
+ latch.await(100000, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ Assert.fail("await timed out. " + e.getLocalizedMessage());
+ }
+ });
+ Future f2 = pool.submit(
+ () -> {
+ Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+ latch.countDown();
+ try {
+ latch.await(100000, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ Assert.fail("await timed out. " + e.getLocalizedMessage());
+ }
+ });
+ Future f3 = pool.submit(
+ () -> {
+ Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+ // This processor never completes its task
+ //latch.countDown();
+ try {
+ latch.await(100000, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ Assert.fail("await timed out. " + e.getLocalizedMessage());
+ }
+ });
+
+ try {
+ f1.get(300, TimeUnit.MILLISECONDS);
+ f2.get(300, TimeUnit.MILLISECONDS);
+ f3.get(300, TimeUnit.MILLISECONDS);
+ Assert.fail("Latch should've timeout.");
+ } catch (Exception e) {
+ f1.cancel(true);
+ f2.cancel(true);
+ f3.cancel(true);
+ // expected
+ }
+ pool.shutdownNow();
+ }
+
+ @Test
+ public void testSingleCountdown() {
+ System.out.println("Started single countdown");
+ String latchId = "l1";
+ int latchSize = 3;
+
+ ExecutorService pool = Executors.newFixedThreadPool(3);
+ // Only one thread invokes countDown
+ Future f1 = pool.submit(
+ () -> {
+ Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+ latch.countDown();
+ TestZkUtils.sleepMs(100);
+ latch.countDown();
+ TestZkUtils.sleepMs(100);
+ latch.countDown();
+ try {
+ latch.await(100000, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ Assert.fail("await timed out. " + e.getLocalizedMessage());
+ }
+ });
+ Future f2 = pool.submit(
+ () -> {
+ Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+ try {
+ latch.await(100000, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ Assert.fail("await timed out. " + e.getLocalizedMessage());
+ }
+ });
+ Future f3 = pool.submit(
+ () -> {
+ Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+ try {
+ latch.await(100000, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ Assert.fail("await timed out. " + e.getLocalizedMessage());
+ }
+ });
+ try {
+ f1.get(600, TimeUnit.MILLISECONDS);
+ f2.get(600, TimeUnit.MILLISECONDS);
+ f3.get(600, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ Assert.fail("Failed to get.");
+ }
+ pool.shutdownNow();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
index a1ad363..2c44aea 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
@@ -19,6 +19,16 @@
package org.apache.samza.test.processor;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -28,6 +38,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ZkConfig;
import org.apache.samza.processor.StreamProcessor;
import org.apache.samza.task.AsyncStreamTaskAdapter;
import org.apache.samza.task.AsyncStreamTaskFactory;
@@ -37,17 +48,6 @@ import org.apache.samza.test.StandaloneTestUtils;
import org.junit.Assert;
import org.junit.Test;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
import static org.apache.samza.test.processor.IdentityStreamTask.endLatch;
public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
@@ -144,21 +144,14 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
private Map<String, String> createConfigs(String testSystem, String inputTopic, String outputTopic, int messageCount) {
Map<String, String> configs = new HashMap<>();
configs.putAll(
- StandaloneTestUtils.getStandaloneConfigs(
- "test-job",
- "org.apache.samza.test.processor.IdentityStreamTask"));
- configs.putAll(
- StandaloneTestUtils.getKafkaSystemConfigs(
- testSystem,
- bootstrapServers(),
- zkConnect(),
- null,
- StandaloneTestUtils.SerdeAlias.STRING,
- true));
+ StandaloneTestUtils.getStandaloneConfigs("test-job", "org.apache.samza.test.processor.IdentityStreamTask"));
+ configs.putAll(StandaloneTestUtils.getKafkaSystemConfigs(testSystem, bootstrapServers(), zkConnect(), null,
+ StandaloneTestUtils.SerdeAlias.STRING, true));
configs.put("task.inputs", String.format("%s.%s", testSystem, inputTopic));
configs.put("app.messageCount", String.valueOf(messageCount));
configs.put("app.outputTopic", outputTopic);
configs.put("app.outputSystem", testSystem);
+ configs.put(ZkConfig.ZK_CONNECT, zkConnect());
return configs;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 5de30d8..417ada4 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
include \
'samza-api',
'samza-elasticsearch',
[2/2] samza git commit: SAMZA-1151 - Coordination Service
Posted by na...@apache.org.
SAMZA-1151 - Coordination Service
Author: Boris Shkolnik <bo...@apache.org>
Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Author: Boris Shkolnik <bs...@linkedin.com>
Reviewers: Xinyu Liu <xi...@apache.org>, Navina Ramesh <na...@apache.org>
Closes #91 from sborya/CoordinationService
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/553ce33b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/553ce33b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/553ce33b
Branch: refs/heads/master
Commit: 553ce33b1ad993f1b62e8a0f735287feffd433e4
Parents: 61cf4e4
Author: Boris Shkolnik <bo...@apache.org>
Authored: Wed Mar 29 10:44:49 2017 -0700
Committer: navina <na...@apache.org>
Committed: Wed Mar 29 10:44:49 2017 -0700
----------------------------------------------------------------------
checkstyle/checkstyle-suppressions.xml | 31 ++
checkstyle/checkstyle.xml | 22 +-
.../autoscaling/deployer/ConfigManager.java | 2 +-
.../samza/config/JobCoordinatorConfig.java | 11 +
.../java/org/apache/samza/config/ZkConfig.java | 6 +
.../coordinator/BarrierForVersionUpgrade.java | 47 ++
.../coordinator/CoordinationServiceFactory.java | 36 ++
.../samza/coordinator/CoordinationUtils.java | 44 ++
.../coordinator/JobCoordinatorFactory.java | 4 +-
.../apache/samza/coordinator/LeaderElector.java | 54 ++
.../coordinator/LeaderElectorListener.java | 27 +
.../leaderelection/LeaderElector.java | 46 --
.../apache/samza/processor/StreamProcessor.java | 10 +-
.../StandaloneJobCoordinatorFactory.java | 4 +-
.../samza/zk/BarrierForVersionUpgrade.java | 41 --
.../samza/zk/ZkBarrierForVersionUpgrade.java | 76 +--
.../org/apache/samza/zk/ZkControllerImpl.java | 17 +-
.../samza/zk/ZkCoordinationServiceFactory.java | 39 ++
.../apache/samza/zk/ZkCoordinationUtils.java | 66 +++
.../org/apache/samza/zk/ZkJobCoordinator.java | 20 +-
.../samza/zk/ZkJobCoordinatorFactory.java | 11 +-
.../java/org/apache/samza/zk/ZkKeyBuilder.java | 4 +-
.../org/apache/samza/zk/ZkLeaderElector.java | 90 ++--
.../org/apache/samza/zk/ZkProcessorLatch.java | 70 +++
.../main/java/org/apache/samza/zk/ZkUtils.java | 3 +
.../org/apache/samza/coordinator/Latch.java | 33 ++
.../org/apache/samza/task/TestAsyncRunLoop.java | 1 +
.../zk/TestZkBarrierForVersionUpgrade.java | 88 ++--
.../org/apache/samza/zk/TestZkKeyBuilder.java | 2 +-
.../apache/samza/zk/TestZkLeaderElector.java | 514 ++++++++++---------
.../apache/samza/zk/TestZkProcessorLatch.java | 294 +++++++++++
.../test/processor/TestStreamProcessor.java | 37 +-
settings.gradle | 1 +
33 files changed, 1237 insertions(+), 514 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/checkstyle/checkstyle-suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle-suppressions.xml b/checkstyle/checkstyle-suppressions.xml
new file mode 100644
index 0000000..428ac93
--- /dev/null
+++ b/checkstyle/checkstyle-suppressions.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0"?>
+
+<!DOCTYPE suppressions PUBLIC
+ "-//Puppy Crawl//DTD Suppressions 1.0//EN"
+ "http://www.puppycrawl.com/dtds/suppressions_1_0.dtd">
+
+<!--
+ // Licensed to the Apache Software Foundation (ASF) under one or more
+ // contributor license agreements. See the NOTICE file distributed with
+ // this work for additional information regarding copyright ownership.
+ // The ASF licenses this file to You under the Apache License, Version 2.0
+ // (the "License"); you may not use this file except in compliance with
+ // the License. You may obtain a copy of the License at
+ //
+ // http://www.apache.org/licenses/LICENSE-2.0
+ //
+ // Unless required by applicable law or agreed to in writing, software
+ // distributed under the License is distributed on an "AS IS" BASIS,
+ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ // See the License for the specific language governing permissions and
+ // limitations under the License.
+-->
+
+<suppressions>
+<!-- example
+ <suppress checks="Indentation"
+ files="TestZkProcessorLatch.java"
+ lines="91-275"/>
+ -->
+</suppressions>
+
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 775d674..479896e 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE module PUBLIC
- "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+ "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
"http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
<!--
// Licensed to the Apache Software Foundation (ASF) under one or more
@@ -9,29 +9,30 @@
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
-//
+//
// http://www.apache.org/licenses/LICENSE-2.0
-//
+//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
--->
+-->
<module name="Checker">
<property name="localeLanguage" value="en"/>
+
<!-- allow suppression for specific files -->
<module name="SuppressionCommentFilter"/>
<module name="FileTabCharacter"/>
-
+
<!-- header: use one star only -->
<module name="RegexpHeader">
<property name="header" value="/\*\nLicensed to the Apache.*"/>
</module>
-
+
<module name="TreeWalker">
-
+
<!-- code cleanup -->
<module name="UnusedImports"/>
<module name="FileContentsHolder"/>
@@ -42,7 +43,7 @@
<module name="OneStatementPerLine"/>
<module name="UnnecessaryParentheses" />
<module name="SimplifyBooleanReturn"/>
-
+
<!-- style -->
<module name="DefaultComesLast"/>
<module name="EmptyStatement"/>
@@ -61,7 +62,7 @@
<module name="ParameterName"/>
<module name="StaticVariableName"/>
<module name="TypeName"/>
-
+
<!-- whitespace -->
<module name="GenericWhitespace"/>
<module name="NoWhitespaceBefore"/>
@@ -82,4 +83,7 @@
<module name="ParenPad"/>
<module name="TypecastParenPad"/>
</module>
+ <module name="SuppressionFilter">
+ <property name="file" value="checkstyle/checkstyle-suppressions.xml"/>
+ </module>
</module>
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
----------------------------------------------------------------------
diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
index e3839ca..fd1e039 100644
--- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
+++ b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
@@ -284,7 +284,7 @@ public class ConfigManager {
//killing the current job
log.info("Killing the current job");
yarnUtil.killApplication(applicationId);
- //clear the global variables
+ //reset the global variables
coordinatorServerURL = null;
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
index 946a308..e0c599d 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
@@ -23,6 +23,7 @@ import com.google.common.base.Strings;
public class JobCoordinatorConfig extends MapConfig {
public static final String JOB_COORDINATOR_FACTORY = "job-coordinator.factory";
+ public static final String JOB_COORDINATIOIN_SERVICE_FACTORY = "job-coordinationService.factory";
public JobCoordinatorConfig(Config config) {
super(config);
@@ -37,4 +38,14 @@ public class JobCoordinatorConfig extends MapConfig {
return jobCoordinatorFactoryClassName;
}
+
+ public String getJobCoordinationServiceFactoryClassName() {
+ String jobCooridanationFactoryClassName = get(JOB_COORDINATIOIN_SERVICE_FACTORY, "org.apache.samza.zk.ZkCoordinationServiceFactory");
+ if (Strings.isNullOrEmpty(jobCooridanationFactoryClassName)) {
+ throw new ConfigException(
+ String.format("config '%s' is set to empty. Cannot instantiate coordination utils!", JOB_COORDINATOR_FACTORY));
+ }
+
+ return jobCooridanationFactoryClassName;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
index f26b2d9..fc483eb 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
@@ -27,6 +27,8 @@ public class ZkConfig extends MapConfig {
public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 60000;
public static final int DEFAULT_SESSION_TIMEOUT_MS = 30000;
+ public static final String ZK_BARRIER_TIMEOUT_MS = "coordinator.zk.barrier-timeout-ms";
+ public static final int DEFAULT_BARRIER_TIMEOUT_MS = 40000;
public ZkConfig(Config config) {
super(config);
@@ -46,4 +48,8 @@ public class ZkConfig extends MapConfig {
public int getZkConnectionTimeoutMs() {
return getInt(ZK_CONNECTION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS);
}
+
+ public int getZkBarrierTimeoutMs() {
+ return getInt(ZK_BARRIER_TIMEOUT_MS, DEFAULT_BARRIER_TIMEOUT_MS);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/coordinator/BarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/BarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/coordinator/BarrierForVersionUpgrade.java
new file mode 100644
index 0000000..145d81c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/BarrierForVersionUpgrade.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import java.util.List;
+
+
+/**
+ * Interface for a barrier - to allow synchronization between different processors to switch to a newly published
+ * JobModel.
+ */
+public interface BarrierForVersionUpgrade {
+ /**
+ * Barrier is usually started by the leader.
+ * @param version - for which the barrier is created
+ * @param participatns - list of participants that need to join for barrier to complete
+ */
+ void start(String version, List<String> participatns);
+
+ /**
+ * Called by the processor.
+ * Updates the processor readiness to use the new version and wait on the barrier, until all other processors
+ * joined.
+ * The call is async. The callback will be invoked when the barrier is reached.
+ * @param version - for which the barrier waits
+ * @param thisProcessorsName as it appears in the list of processors.
+ * @param callback will be invoked, when barrier is reached.
+ */
+ void waitForBarrier(String version, String thisProcessorsName, Runnable callback);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationServiceFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationServiceFactory.java
new file mode 100644
index 0000000..497d3e0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationServiceFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * factory to instantiate a c{@link CoordinationUtils} service
+ */
+public interface CoordinationServiceFactory {
+ /**
+ * get a unique service instance
+ * @param groupId - unique id to identify the service
+ * @param participantId - a unique id that identifies the participant in the service
+ * @param updatedConfig - configs, to define the details of the service
+ * @return a unique service instance
+ */
+ CoordinationUtils getCoordinationService(String groupId, String participantId, Config updatedConfig);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
new file mode 100644
index 0000000..39bda24
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+/** THIS API WILL CHANGE
+ *
+ * Coordination service provides synchronization primitives.
+ * The actual implementation (for example ZK based) is left to each implementation class.
+ * This service provide three primitives:
+ * - LeaderElection
+ * - Latch
+ * - barrier for version upgrades
+ */
+public interface CoordinationUtils {
+
+ /**
+ * reset the internal structure. Does not happen automatically with stop()
+ */
+ void reset();
+
+
+ // facilities for group coordination
+ LeaderElector getLeaderElector(); // leaderElector is unique based on the groupId
+
+ Latch getLatch(int size, String latchId);
+
+ BarrierForVersionUpgrade getBarrier(String barrierId);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
index 3da70e0..056bdb1 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
@@ -22,6 +22,7 @@ import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.Config;
import org.apache.samza.processor.SamzaContainerController;
+
@InterfaceStability.Evolving
public interface JobCoordinatorFactory {
/**
@@ -31,5 +32,6 @@ public interface JobCoordinatorFactory {
* pause the container and add/remove tasks
* @return An instance of IJobCoordinator
*/
- JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController);
+ JobCoordinator getJobCoordinator(int processorId, Config config,
+ SamzaContainerController containerController, CoordinationUtils coordinationUtils);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java b/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java
new file mode 100644
index 0000000..c6c8bbb
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElector.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * Leader elector async primitives, implemented based on ZK.
+ * The callback is a async, and run in a separate (common) thread.
+ * So the caller should never block in the callback.
+ * Callbacks will be delivered on callback at a time. Others will wait.
+ *
+ */
+@InterfaceStability.Evolving
+public interface LeaderElector {
+ /**
+ * Async method that helps the caller participate in leader election.
+ *
+ * @param leaderElectorListener to be invoked if the caller is chosen as a leader through the leader election process
+ */
+ void tryBecomeLeader(LeaderElectorListener leaderElectorListener);
+
+ /**
+ * Method that allows a caller to resign from leadership role. Caller can resign from leadership due to various
+ * reasons such as shutdown, connection failures etc.
+ * This method should clear any state created by the leader and clean-up the resources used by the leader.
+ */
+ void resignLeadership();
+
+ /**
+ * Method that can be used to know if the caller is the current leader or not
+ *
+ * @return True, if the caller is the current leader. False, otherwise
+ */
+ boolean amILeader();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElectorListener.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElectorListener.java b/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElectorListener.java
new file mode 100644
index 0000000..ef61229
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/LeaderElectorListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+/**
+ * This call back should be passed to {@link LeaderElector#tryBecomeLeader} and
+ * will be invoked if the caller becomes the leader.
+ */
+public interface LeaderElectorListener {
+ void onBecomingLeader();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java b/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java
deleted file mode 100644
index 94e3311..0000000
--- a/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.coordinator.leaderelection;
-
-import org.apache.samza.annotation.InterfaceStability;
-
-@InterfaceStability.Evolving
-public interface LeaderElector {
- /**
- * Method that helps the caller participate in leader election and returns when the participation is complete
- *
- * @return True, if caller is chosen as a leader through the leader election process. False, otherwise.
- */
- boolean tryBecomeLeader();
-
- /**
- * Method that allows a caller to resign from leadership role. Caller can resign from leadership due to various
- * reasons such as shutdown, connection failures etc.
- * This method should clear any state created by the leader and clean-up the resources used by the leader.
- */
- void resignLeadership();
-
- /**
- * Method that can be used to know if the caller is the current leader or not
- *
- * @return True, if the caller is the current leader. False, otherwise
- */
- boolean amILeader();
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 15d9b9d..3a62275 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -23,6 +23,8 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.CoordinationServiceFactory;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
import org.apache.samza.metrics.MetricsReporter;
@@ -124,11 +126,17 @@ public class StreamProcessor {
String.valueOf(processorId),
customMetricsReporters);
+ CoordinationUtils jobCooridanationService = Util.
+ <CoordinationServiceFactory>getObj(
+ new JobCoordinatorConfig(updatedConfig)
+ .getJobCoordinationServiceFactoryClassName())
+ .getCoordinationService("groupId", String.valueOf(processorId), updatedConfig);
+
this.jobCoordinator = Util.
<JobCoordinatorFactory>getObj(
new JobCoordinatorConfig(updatedConfig)
.getJobCoordinatorFactoryClassName())
- .getJobCoordinator(processorId, updatedConfig, containerController);
+ .getJobCoordinator(processorId, updatedConfig, containerController, jobCooridanationService);
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
index 7ca85c0..3588dce 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
@@ -19,13 +19,15 @@
package org.apache.samza.standalone;
import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
import org.apache.samza.processor.SamzaContainerController;
public class StandaloneJobCoordinatorFactory implements JobCoordinatorFactory {
@Override
- public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController) {
+ public JobCoordinator getJobCoordinator(int processorId, Config config,
+ SamzaContainerController containerController, CoordinationUtils coordinationUtils) {
return new StandaloneJobCoordinator(processorId, config, containerController);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
deleted file mode 100644
index 553e730..0000000
--- a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.zk;
-
-/**
- * Interface for a barrier - to allow synchronization between different processors to switch to a newly published
- * JobModel.
- */
-public interface BarrierForVersionUpgrade {
- /**
- * Barrier is usually started by the leader.
- */
- void start();
-
- /**
- * Called by the processor.
- * Updates the processor readiness to use the new version and wait on the barrier, until all other processors
- * joined.
- * The call is async. The callback will be invoked when the barrier is reached.
- * @param thisProcessorsName as it appears in the list of processors.
- * @param callback will be invoked, when barrier is reached.
- */
- void waitForBarrier(String thisProcessorsName, Runnable callback);
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
index d0332ab..0afd840 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -26,6 +26,7 @@ import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.exception.ZkBadVersionException;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.BarrierForVersionUpgrade;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,34 +44,25 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
private final ZkKeyBuilder keyBuilder;
private final static String BARRIER_DONE = "done";
private final static String BARRIER_TIMED_OUT = "TIMED_OUT";
- private final static long BARRIER_TIMED_OUT_MS = 60 * 1000;
private final static Logger LOG = LoggerFactory.getLogger(ZkBarrierForVersionUpgrade.class);
private final ScheduleAfterDebounceTime debounceTimer;
private final String barrierPrefix;
- private final String barrierPath;
- private final String barrierDonePath;
- private final String barrierProcessors;
- private final String version;
- private final List<String> processorsNames;
+ private String barrierPath;
+ private String barrierDonePath;
+ private String barrierProcessors;
private static final String VERSION_UPGRADE_TIMEOUT_TIMER = "VersionUpgradeTimeout";
+ private final long barrierTimeoutMS;
- public ZkBarrierForVersionUpgrade(ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer, String version, List<String> processorsNames) {
+ public ZkBarrierForVersionUpgrade(String barrierId, ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer, long barrierTimeoutMS) {
this.zkUtils = zkUtils;
keyBuilder = zkUtils.getKeyBuilder();
- barrierPrefix = keyBuilder.getJobModelVersionBarrierPrefix();
- this.debounceTimer = debounceTimer;
+ barrierPrefix = keyBuilder.getJobModelVersionBarrierPrefix(barrierId);
- barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
- barrierDonePath = String.format("%s/barrier_done", barrierPath);
- barrierProcessors = String.format("%s/barrier_processors", barrierPath);
-
- this.version = version;
- this.processorsNames = processorsNames;
-
- zkUtils.makeSurePersistentPathsExists(new String[]{barrierPrefix, barrierPath, barrierProcessors, barrierDonePath});
+ this.debounceTimer = debounceTimer;
+ this.barrierTimeoutMS = barrierTimeoutMS;
}
/**
@@ -83,7 +75,7 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
}
protected long getBarrierTimeOutMs() {
- return BARRIER_TIMED_OUT_MS;
+ return barrierTimeoutMS;
}
private void timerOff(final String version, final Stat currentStatOfBarrierDone) {
@@ -91,49 +83,62 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
// write a new value "TIMED_OUT", if the value was changed since previous value, make sure it was changed to "DONE"
zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_TIMED_OUT, currentStatOfBarrierDone.getVersion());
} catch (ZkBadVersionException e) {
- // failed to write, make sure the value is "DONE"
- LOG.warn("Barrier timeout write failed");
+ // Expected. failed to write, make sure the value is "DONE"
+ ///LOG.("Barrier timeout write failed");
String done = zkUtils.getZkClient().<String>readData(barrierDonePath);
+ LOG.info("Barrier timeout expired, but done=" + done);
if (!done.equals(BARRIER_DONE)) {
throw new SamzaException("Failed to write to the barrier_done, version=" + version, e);
}
}
}
+ private void setPaths(String version) {
+ barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
+ barrierDonePath = String.format("%s/barrier_done", barrierPath);
+ barrierProcessors = String.format("%s/barrier_processors", barrierPath);
+
+ zkUtils.makeSurePersistentPathsExists(new String[]{barrierPrefix, barrierPath, barrierProcessors, barrierDonePath});
+ }
+
@Override
- public void start() {
+ public void start(String version, List<String> participants) {
+
+ setPaths(version);
// subscribe for processor's list changes
LOG.info("Subscribing for child changes at " + barrierProcessors);
- zkUtils.getZkClient().subscribeChildChanges(barrierProcessors, new ZkBarrierChangeHandler(version, processorsNames));
+ zkUtils.getZkClient().subscribeChildChanges(barrierProcessors, new ZkBarrierChangeHandler(participants));
// create a timer for time-out
Stat currentStatOfBarrierDone = new Stat();
zkUtils.getZkClient().readData(barrierDonePath, currentStatOfBarrierDone);
+
setTimer(version, getBarrierTimeOutMs(), currentStatOfBarrierDone);
}
@Override
- public void waitForBarrier(String processorsName, Runnable callback) {
- final String barrierProcessorThis = String.format("%s/%s", barrierProcessors, processorsName);
+ public void waitForBarrier(String version, String participantName, Runnable callback) {
- // update the barrier for this processor
- LOG.info("Creating a child for barrier at " + barrierProcessorThis);
- zkUtils.getZkClient().createPersistent(barrierProcessorThis);
+ setPaths(version);
+ final String barrierProcessorThis = String.format("%s/%s", barrierProcessors, participantName);
// now subscribe for the barrier
zkUtils.getZkClient().subscribeDataChanges(barrierDonePath, new ZkBarrierReachedHandler(barrierDonePath, debounceTimer, callback));
+
+ // update the barrier for this processor
+ LOG.info("Creating a child for barrier at " + barrierProcessorThis);
+ zkUtils.getZkClient().createPersistent(barrierProcessorThis);
}
/**
- * listener for the subscription.
+ * Listener for the subscription for the list of participants.
+ * This method will identify when all the participants have joined.
*/
class ZkBarrierChangeHandler implements IZkChildListener {
- private final String version;
private final List<String> names;
- public ZkBarrierChangeHandler(String version, List<String> names) {
- this.version = version;
+ public ZkBarrierChangeHandler(List<String> names) {
this.names = names;
}
@@ -151,7 +156,7 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
if (CollectionUtils.containsAll(names, currentChildren)) {
LOG.info("ALl nodes reached the barrier");
LOG.info("Writing BARRIER DONE to " + barrierDonePath);
- zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_DONE);
+ zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_DONE); // this will trigger notifications
}
}
}
@@ -173,16 +178,13 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
String done = (String) data;
LOG.info("got notification about barrier path=" + barrierPathDone + "; done=" + done);
if (done.equals(BARRIER_DONE)) {
- zkUtils.unsubscribeDataChanges(barrierPathDone, this);
debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, callback);
} else if (done.equals(BARRIER_TIMED_OUT)) {
// timed out
LOG.warn("Barrier for " + dataPath + " timed out");
- LOG.info("Barrier for " + dataPath + " timed out");
- zkUtils.unsubscribeDataChanges(barrierPathDone, this);
}
- // we do not need to resubscribe because, ZkClient library does it for us.
-
+ // in any case we unsubscribe
+ zkUtils.unsubscribeDataChanges(barrierPathDone, this);
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
index 70c8a37..4570a62 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
@@ -22,6 +22,7 @@ package org.apache.samza.zk;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.LeaderElectorListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,14 +43,7 @@ public class ZkControllerImpl implements ZkController {
this.processorIdStr = processorIdStr;
this.zkUtils = zkUtils;
this.zkControllerListener = zkControllerListener;
- this.leaderElector = new ZkLeaderElector(processorIdStr, zkUtils,
- new ZkLeaderElector.ZkLeaderElectorListener() {
- @Override
- public void onBecomingLeader() {
- onBecomeLeader();
- }
- }
- );
+ this.leaderElector = new ZkLeaderElector(processorIdStr, zkUtils, debounceTimer);
this.debounceTimer = debounceTimer;
init();
@@ -76,7 +70,12 @@ public class ZkControllerImpl implements ZkController {
// TODO - make a loop here with some number of attempts.
// possibly split into two method - becomeLeader() and becomeParticipant()
- leaderElector.tryBecomeLeader();
+ leaderElector.tryBecomeLeader(new LeaderElectorListener() {
+ @Override
+ public void onBecomingLeader() {
+ onBecomeLeader();
+ }
+ });
// subscribe to JobModel version updates
zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(debounceTimer));
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
new file mode 100644
index 0000000..cc454e3
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.CoordinationServiceFactory;
+
+
+public class ZkCoordinationServiceFactory implements CoordinationServiceFactory {
+
+
+ synchronized public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) {
+ ZkConfig zkConfig = new ZkConfig(config);
+ ZkClient zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
+ ZkUtils zkUtils = new ZkUtils(participantId, new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs());
+ ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+ return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, debounceTimer);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
new file mode 100644
index 0000000..e381a41
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.BarrierForVersionUpgrade;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.Latch;
+import org.apache.samza.coordinator.LeaderElector;
+
+
+public class ZkCoordinationUtils implements CoordinationUtils {
+ public final ZkConfig zkConfig;
+ public final ZkUtils zkUtils;
+ public final String processorIdStr;
+ public final ScheduleAfterDebounceTime debounceTimer;
+
+ public ZkCoordinationUtils(String processorId, ZkConfig zkConfig, ZkUtils zkUtils,
+ ScheduleAfterDebounceTime debounceTimer) {
+ this.zkConfig = zkConfig;
+ this.zkUtils = zkUtils;
+ this.processorIdStr = processorId;
+ this.debounceTimer = debounceTimer;
+ }
+
+ @Override
+ public void reset() {
+ zkUtils.deleteRoot();
+ }
+
+ @Override
+ public LeaderElector getLeaderElector() {
+ return new ZkLeaderElector(processorIdStr, zkUtils, debounceTimer);
+ }
+
+ @Override
+ public Latch getLatch(int size, String latchId) {
+ return new ZkProcessorLatch(size, latchId, processorIdStr, zkConfig, zkUtils);
+ }
+
+ @Override
+ public BarrierForVersionUpgrade getBarrier(String barrierId) {
+ return new ZkBarrierForVersionUpgrade(barrierId, zkUtils, debounceTimer, zkConfig.getZkBarrierTimeoutMs());
+ }
+ @VisibleForTesting
+ public ZkUtils getZkUtils() {
+ return zkUtils;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 9e5dd84..87d6bac 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.job.model.JobModel;
@@ -44,22 +45,24 @@ import org.slf4j.LoggerFactory;
*/
public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
private static final Logger log = LoggerFactory.getLogger(ZkJobCoordinator.class);
+ private static final String JOB_MODEL_VERSION_BARRIER = "JobModelVersion";
private final ZkUtils zkUtils;
private final int processorId;
private final ZkController zkController;
private final SamzaContainerController containerController;
- private BarrierForVersionUpgrade barrier;
private final ScheduleAfterDebounceTime debounceTimer;
private final StreamMetadataCache streamMetadataCache;
private final ZkKeyBuilder keyBuilder;
private final Config config;
+ private final CoordinationUtils coordinationUtils;
private JobModel newJobModel;
private String newJobModelVersion; // version published in ZK (by the leader)
private JobModel jobModel;
- public ZkJobCoordinator(int processorId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils, SamzaContainerController containerController) {
+ public ZkJobCoordinator(int processorId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils,
+ SamzaContainerController containerController, CoordinationUtils coordinationUtils) {
this.zkUtils = zkUtils;
this.keyBuilder = zkUtils.getKeyBuilder();
this.debounceTimer = debounceTimer;
@@ -67,6 +70,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
this.containerController = containerController;
this.zkController = new ZkControllerImpl(String.valueOf(processorId), zkUtils, debounceTimer, this);
this.config = config;
+ this.coordinationUtils = coordinationUtils;
streamMetadataCache = getStreamMetadataCache();
}
@@ -101,6 +105,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
@Override
public void stop() {
zkController.stop();
+ if (containerController != null)
+ containerController.stopContainer();
}
@Override
@@ -152,7 +158,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
String zkProcessorId = keyBuilder.parseIdFromPath(currentPath);
// update ZK and wait for all the processors to get this new version
- barrier.waitForBarrier(String.valueOf(zkProcessorId), new Runnable() {
+ ZkBarrierForVersionUpgrade barrier = (ZkBarrierForVersionUpgrade) coordinationUtils.getBarrier(JOB_MODEL_VERSION_BARRIER);
+ barrier.waitForBarrier(version, String.valueOf(zkProcessorId), new Runnable() {
@Override
public void run() {
onNewJobModelConfirmed(version);
@@ -199,7 +206,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
}
log.info("generate new job model: processorsIds: " + sb.toString());
- jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, containerIds);
+ jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
+ containerIds);
log.info("pid=" + processorId + "Generated jobModel: " + jobModel);
@@ -208,8 +216,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion + ";jm=" + jobModel);
// start the barrier for the job model update
- barrier = new ZkBarrierForVersionUpgrade(zkUtils, debounceTimer, nextJMVersion, currentProcessors);
- barrier.start();
+ ZkBarrierForVersionUpgrade barrier = (ZkBarrierForVersionUpgrade) coordinationUtils.getBarrier(JOB_MODEL_VERSION_BARRIER);
+ barrier.start(nextJMVersion, currentProcessors);
// publish new JobModel version
zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index e211f70..22ead65 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -23,6 +23,7 @@ import org.I0Itec.zkclient.ZkClient;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
import org.apache.samza.processor.SamzaContainerController;
@@ -36,22 +37,24 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
* @return An instance of IJobCoordinator
*/
@Override
- public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController) {
+ public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController, CoordinationUtils coordinationUtils) {
JobConfig jobConfig = new JobConfig(config);
- String groupName = String.format("%s-%s", jobConfig.getName(), jobConfig.getJobId());
+ String groupName = String.format("%s-%s", jobConfig.getName().get(), jobConfig.getJobId().get());
ZkConfig zkConfig = new ZkConfig(config);
+ String processorIdStr = String.valueOf(processorId);
ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
ZkClient zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
+
return new ZkJobCoordinator(
processorId,
config,
debounceTimer,
new ZkUtils(
- String.valueOf(processorId),
+ processorIdStr,
new ZkKeyBuilder(groupName),
zkClient,
zkConfig.getZkConnectionTimeoutMs()
),
- containerController);
+ containerController, coordinationUtils);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
index 0a8f37e..44f83e4 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
@@ -88,7 +88,7 @@ public class ZkKeyBuilder {
return String.format("%s/%s", getJobModelPathPrefix(), jobModelVersion);
}
- public String getJobModelVersionBarrierPrefix() {
- return String.format("/%s/versionBarriers", pathPrefix);
+ public String getJobModelVersionBarrierPrefix(String barrierId) {
+ return String.format("/%s/%s/versionBarriers", pathPrefix, barrierId);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
index b9bdf11..8a027d9 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
@@ -20,17 +20,18 @@
package org.apache.samza.zk;
import com.google.common.annotations.VisibleForTesting;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.apache.samza.SamzaException;
-import org.apache.samza.coordinator.leaderelection.LeaderElector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.LeaderElectorListener;
+import org.apache.samza.coordinator.LeaderElector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* <p>
@@ -43,37 +44,32 @@ import java.util.concurrent.atomic.AtomicBoolean;
* </p>
* */
public class ZkLeaderElector implements LeaderElector {
- public static final Logger LOGGER = LoggerFactory.getLogger(ZkLeaderElector.class);
+ public static final Logger LOG = LoggerFactory.getLogger(ZkLeaderElector.class);
private final ZkUtils zkUtils;
private final String processorIdStr;
private final ZkKeyBuilder keyBuilder;
private final String hostName;
+ private final ScheduleAfterDebounceTime debounceTimer;
private AtomicBoolean isLeader = new AtomicBoolean(false);
- private final IZkDataListener previousProcessorChangeListener;
- ZkLeaderElectorListener zkLeaderElectorListener;
+ private IZkDataListener previousProcessorChangeListener;
private String currentSubscription = null;
private final Random random = new Random();
@VisibleForTesting
- ZkLeaderElector(String processorIdStr,
- ZkUtils zkUtils,
- ZkLeaderElectorListener zkLeaderElectorListener,
- IZkDataListener previousProcessorChangeListener) {
+ public void setPreviousProcessorChangeListener(IZkDataListener previousProcessorChangeListener) {
+ this.previousProcessorChangeListener = previousProcessorChangeListener;
+ }
+
+ public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer) {
this.processorIdStr = processorIdStr;
this.zkUtils = zkUtils;
- this.keyBuilder = this.zkUtils.getKeyBuilder();
+ this.keyBuilder = zkUtils.getKeyBuilder();
this.hostName = getHostName();
- this.zkLeaderElectorListener = zkLeaderElectorListener; // listener to inform the caller that they have become the leader
- if (previousProcessorChangeListener == null)
- this.previousProcessorChangeListener = new PreviousProcessorChangeListener();
- else
- this.previousProcessorChangeListener = previousProcessorChangeListener;
- }
-
- public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils, ZkLeaderElectorListener zkLeaderElectorListener) {
- this(processorIdStr, zkUtils, zkLeaderElectorListener, null);
+ this.debounceTimer = (debounceTimer != null) ? debounceTimer : new ScheduleAfterDebounceTime();
+// String [] paths = new String[]{keyBuilder.getProcessorsPath()};
+// zkUtils.makeSurePersistentPathsExists(paths);
}
// TODO: This should go away once we integrate with Zk based Job Coordinator
@@ -81,45 +77,47 @@ public class ZkLeaderElector implements LeaderElector {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
- LOGGER.error("Failed to fetch hostname of the processor", e);
+ LOG.error("Failed to fetch hostname of the processor", e);
throw new SamzaException(e);
}
}
- public interface ZkLeaderElectorListener {
- void onBecomingLeader();
- }
-
@Override
- public boolean tryBecomeLeader() {
- String currentPath = zkUtils.registerProcessorAndGetId(hostName);
+ public void tryBecomeLeader(LeaderElectorListener leaderElectorListener) {
+ String currentPath = zkUtils.registerProcessorAndGetId(hostName + " " + processorIdStr);
List<String> children = zkUtils.getSortedActiveProcessors();
- LOGGER.debug(zLog("Current active processors - " + children));
+ LOG.debug(zLog("Current active processors - " + children));
int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));
+ LOG.info("tryBecomeLeader: index = " + index + " for path=" + currentPath + " out of " + Arrays.toString(children.toArray()));
if (children.size() == 0 || index == -1) {
throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!");
}
if (index == 0) {
isLeader.getAndSet(true);
- LOGGER.info(zLog("Eligible to become the leader!"));
- zkLeaderElectorListener.onBecomingLeader(); // inform the caller
- return true;
+ LOG.info(zLog("Eligible to become the leader!"));
+ debounceTimer.scheduleAfterDebounceTime("ON_BECOMING_LEADER", 1, () -> leaderElectorListener.onBecomingLeader()); // inform the caller
+ return;
}
isLeader.getAndSet(false);
- LOGGER.info("Index = " + index + " Not eligible to be a leader yet!");
+ LOG.info("Index = " + index + " Not eligible to be a leader yet!");
String predecessor = children.get(index - 1);
if (!predecessor.equals(currentSubscription)) {
if (currentSubscription != null) {
- LOGGER.debug(zLog("Unsubscribing data change for " + currentSubscription));
+
+ // callback in case if the previous node gets deleted (when previous processor dies)
+ if (previousProcessorChangeListener == null)
+ previousProcessorChangeListener = new PreviousProcessorChangeListener(leaderElectorListener);
+
+ LOG.debug(zLog("Unsubscribing data change for " + currentSubscription));
zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
previousProcessorChangeListener);
}
currentSubscription = predecessor;
- LOGGER.info(zLog("Subscribing data change for " + predecessor));
+ LOG.info(zLog("Subscribing data change for " + predecessor));
zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
previousProcessorChangeListener);
}
@@ -130,17 +128,16 @@ public class ZkLeaderElector implements LeaderElector {
*/
boolean predecessorExists = zkUtils.exists(keyBuilder.getProcessorsPath() + "/" + currentSubscription);
if (predecessorExists) {
- LOGGER.info(zLog("Predecessor still exists. Current subscription is valid. Continuing as non-leader."));
+ LOG.info(zLog("Predecessor still exists. Current subscription is valid. Continuing as non-leader."));
} else {
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
Thread.interrupted();
}
- LOGGER.info(zLog("Predecessor doesn't exist anymore. Trying to become leader again..."));
- return tryBecomeLeader();
+ LOG.info(zLog("Predecessor doesn't exist anymore. Trying to become leader again..."));
+ tryBecomeLeader(leaderElectorListener);
}
- return false;
}
@Override
@@ -159,16 +156,21 @@ public class ZkLeaderElector implements LeaderElector {
// Only by non-leaders
class PreviousProcessorChangeListener implements IZkDataListener {
+ private final LeaderElectorListener leaderElectorListener;
+ PreviousProcessorChangeListener(LeaderElectorListener leaderElectorListener) {
+ this.leaderElectorListener = leaderElectorListener;
+ }
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
- LOGGER.debug("Data change on path: " + dataPath + " Data: " + data);
+ LOG.debug("Data change on path: " + dataPath + " Data: " + data);
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
- LOGGER.info(zLog("Data deleted on path " + dataPath + ". Predecessor went away. So, trying to become leader again..."));
- tryBecomeLeader();
+ LOG.info(
+ zLog("Data deleted on path " + dataPath + ". Predecessor went away. So, trying to become leader again..."));
+ tryBecomeLeader(leaderElectorListener);
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java b/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java
new file mode 100644
index 0000000..4394302
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.Latch;
+
+
+/*
+ * Latch of the sizeN is open when countDown() was called N times.
+ * In this implementation a sequential node is created on every call of countDown().
+ * When Nth node is created await() call returns.
+ */
+public class ZkProcessorLatch implements Latch {
+
+ private final ZkConfig zkConfig;
+ private final ZkUtils zkUtils;
+ private final String processorIdStr;
+ private final ZkKeyBuilder keyBuilder;
+ private final String latchId;
+
+ private final String latchPath;
+ private final String targetPath;
+
+ private final static String LATCH_PATH = "latch";
+ private final int size; // latch size
+
+ public ZkProcessorLatch(int size, String latchId, String participantId, ZkConfig zkConfig, ZkUtils zkUtils) {
+ this.zkConfig = zkConfig;
+ this.zkUtils = zkUtils;
+ this.processorIdStr = participantId;
+ this.latchId = latchId;
+ this.keyBuilder = this.zkUtils.getKeyBuilder();
+ this.size = size;
+
+ latchPath = String.format("%s/%s", keyBuilder.getRootPath(), LATCH_PATH + "_" + latchId);
+ zkUtils.makeSurePersistentPathsExists(new String[] {latchPath});
+ targetPath = String.format("%s/%010d", latchPath, size - 1);
+ System.out.println("targetPath " + targetPath);
+ }
+
+ @Override
+ public void await(long timeout, TimeUnit tu) {
+ zkUtils.getZkClient().waitUntilExists(targetPath, TimeUnit.MILLISECONDS, timeout);
+ }
+
+ @Override
+ public void countDown() {
+ // create persistent (should be ephemeral? Probably not)
+ String path = zkUtils.getZkClient().createPersistentSequential(latchPath + "/", processorIdStr);
+ System.out.println("countDown created " + path);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index e8170e3..7a9b4d5 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -111,8 +111,11 @@ public class ZkUtils {
ephemeralPath =
zkClient.createEphemeralSequential(
keyBuilder.getProcessorsPath() + "/", data);
+
+ LOG.info("newly generated path for " + data + " is " + ephemeralPath);
return ephemeralPath;
} else {
+ LOG.info("existing path for " + data + " is " + ephemeralPath);
return ephemeralPath;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/main/scala/org/apache/samza/coordinator/Latch.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/Latch.java b/samza-core/src/main/scala/org/apache/samza/coordinator/Latch.java
new file mode 100644
index 0000000..5ca9138
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/Latch.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+
+/**
+ * latch implementation for the coordination service.
+ * Supports different size latches.
+ * await() returns when either latch reaches N (N participants call countDown()) or timeout.
+ */
+public interface Latch {
+ void await(long timeout, TimeUnit tu) throws TimeoutException;
+ void countDown();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 35f9eb3..31cbe79 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -521,6 +521,7 @@ public class TestAsyncRunLoop {
}
@Test
+ @Ignore
public void testCommitBehaviourWhenAsyncCommitIsEnabled() throws InterruptedException {
commitRequest = TaskCoordinator.RequestScope.CURRENT_TASK;
maxMessagesInFlight = 2;
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
index a1af782..c0c0e6a 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -19,9 +19,16 @@
package org.apache.samza.zk;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import junit.framework.Assert;
-import org.I0Itec.zkclient.ZkConnection;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.BarrierForVersionUpgrade;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.CoordinationServiceFactory;
import org.apache.samza.testUtils.EmbeddedZookeeper;
import org.junit.After;
import org.junit.AfterClass;
@@ -32,33 +39,34 @@ import org.junit.Test;
public class TestZkBarrierForVersionUpgrade {
private static EmbeddedZookeeper zkServer = null;
- private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
- private String testZkConnectionString = null;
- private ZkUtils testZkUtils = null;
- private static final int SESSION_TIMEOUT_MS = 20000;
- private static final int CONNECTION_TIMEOUT_MS = 10000;
+ private static String testZkConnectionString = null;
+ private static CoordinationUtils coordinationUtils;
+
@BeforeClass
public static void setup() throws InterruptedException {
zkServer = new EmbeddedZookeeper();
zkServer.setup();
+ testZkConnectionString = "localhost:" + zkServer.getPort();
}
@Before
public void testSetup() {
- testZkConnectionString = "localhost:" + zkServer.getPort();
- try {
- testZkUtils = getZkUtilsWithNewClient();
- } catch (Exception e) {
- Assert.fail("Client connection setup failed. Aborting tests..");
- }
+ String groupId = "group1";
+ String processorId = "p1";
+ Map<String, String> map = new HashMap<>();
+ map.put(ZkConfig.ZK_CONNECT, testZkConnectionString);
+ map.put(ZkConfig.ZK_BARRIER_TIMEOUT_MS, "200");
+ Config config = new MapConfig(map);
+
+ CoordinationServiceFactory serviceFactory = new ZkCoordinationServiceFactory();
+ coordinationUtils = serviceFactory.getCoordinationService(groupId, processorId, config);
+ coordinationUtils.reset();
}
@After
- public void testTeardown() {
- testZkUtils.deleteRoot();
- testZkUtils.close();
- testZkUtils = null;
+ public void testTearDown() {
+ coordinationUtils.reset();
}
@AfterClass
@@ -68,13 +76,13 @@ public class TestZkBarrierForVersionUpgrade {
@Test
public void testZkBarrierForVersionUpgrade() {
- ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+ String barrierId = "b1";
String ver = "1";
List<String> processors = new ArrayList<String>();
processors.add("p1");
processors.add("p2");
- ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer, ver, processors);
+ BarrierForVersionUpgrade barrier = coordinationUtils.getBarrier(barrierId);
class Status {
boolean p1 = false;
@@ -82,16 +90,16 @@ public class TestZkBarrierForVersionUpgrade {
}
final Status s = new Status();
- barrier.start();
+ barrier.start(ver, processors);
- barrier.waitForBarrier("p1", new Runnable() {
+ barrier.waitForBarrier(ver, "p1", new Runnable() {
@Override
public void run() {
s.p1 = true;
}
});
- barrier.waitForBarrier("p2", new Runnable() {
+ barrier.waitForBarrier(ver, "p2", new Runnable() {
@Override
public void run() {
s.p2 = true;
@@ -103,14 +111,15 @@ public class TestZkBarrierForVersionUpgrade {
@Test
public void testNegativeZkBarrierForVersionUpgrade() {
- ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+
+ String barrierId = "b1";
String ver = "1";
List<String> processors = new ArrayList<String>();
processors.add("p1");
processors.add("p2");
processors.add("p3");
- ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer, ver, processors);
+ BarrierForVersionUpgrade barrier = coordinationUtils.getBarrier(barrierId);
class Status {
boolean p1 = false;
@@ -119,16 +128,16 @@ public class TestZkBarrierForVersionUpgrade {
}
final Status s = new Status();
- barrier.start();
+ barrier.start(ver, processors);
- barrier.waitForBarrier("p1", new Runnable() {
+ barrier.waitForBarrier(ver, "p1", new Runnable() {
@Override
public void run() {
s.p1 = true;
}
});
- barrier.waitForBarrier("p2", new Runnable() {
+ barrier.waitForBarrier(ver, "p2", new Runnable() {
@Override
public void run() {
s.p2 = true;
@@ -140,20 +149,14 @@ public class TestZkBarrierForVersionUpgrade {
@Test
public void testZkBarrierForVersionUpgradeWithTimeOut() {
- ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
-
+ String barrierId = "b1";
String ver = "1";
List<String> processors = new ArrayList<String>();
processors.add("p1");
processors.add("p2");
processors.add("p3");
- ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer, ver, processors) {
- @Override
- protected long getBarrierTimeOutMs() {
- return 200;
- }
- };
+ BarrierForVersionUpgrade barrier = coordinationUtils.getBarrier(barrierId);
class Status {
boolean p1 = false;
@@ -162,16 +165,16 @@ public class TestZkBarrierForVersionUpgrade {
}
final Status s = new Status();
- barrier.start();
+ barrier.start(ver, processors);
- barrier.waitForBarrier("p1", new Runnable() {
+ barrier.waitForBarrier(ver, "p1", new Runnable() {
@Override
public void run() {
s.p1 = true;
}
});
- barrier.waitForBarrier("p2", new Runnable() {
+ barrier.waitForBarrier(ver, "p2", new Runnable() {
@Override
public void run() {
s.p2 = true;
@@ -179,7 +182,7 @@ public class TestZkBarrierForVersionUpgrade {
});
// this node will join "too late"
- barrier.waitForBarrier("p3", new Runnable() {
+ barrier.waitForBarrier(ver, "p3", new Runnable() {
@Override
public void run() {
TestZkUtils.sleepMs(300);
@@ -187,15 +190,6 @@ public class TestZkBarrierForVersionUpgrade {
}
});
Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 400));
- }
-
- private ZkUtils getZkUtilsWithNewClient() {
- ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
- return new ZkUtils(
- "1",
- KEY_BUILDER,
- ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS),
- CONNECTION_TIMEOUT_MS);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
index b56d279..ef271f0 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
@@ -60,6 +60,6 @@ public class TestZkKeyBuilder {
Assert.assertEquals("/test/jobModels", builder.getJobModelPathPrefix());
String version = "2";
Assert.assertEquals("/test/jobModels/" + version, builder.getJobModelPath(version));
- Assert.assertEquals("/test/versionBarriers", builder.getJobModelVersionBarrierPrefix());
+ Assert.assertEquals("/test/testBarrier/versionBarriers", builder.getJobModelVersionBarrierPrefix("testBarrier"));
}
}