You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 14:03:29 UTC
[45/50] [abbrv] git commit: Initial ZK integration with s4.
Initial ZK integration with s4.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/e049d653
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/e049d653
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/e049d653
Branch: refs/heads/piper
Commit: e049d653b88398e5dbe672feb51ac79c73114aff
Parents: 076e1bf
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Sat Oct 22 00:43:31 2011 -0700
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Sat Oct 22 00:43:31 2011 -0700
----------------------------------------------------------------------
subprojects/s4-admin/s4-admin.gradle | 24 -----
.../main/java/org/apache/s4/admin/TaskSetup.java | 60 -----------
.../java/org/apache/s4/comm/tools/TaskSetup.java | 60 +++++++++++
.../apache/s4/comm/topology/AssignmentFromZK.java | 5 +-
.../apache/s4/comm/topology/TopologyFromZK.java | 4 +-
.../s4/comm/topology/AssignmentFromZKTest.java | 40 ++++++++
.../s4/comm/topology/TopologyFromZKTest.java | 78 +++++++++++++++
.../org/apache/s4/comm/topology/ZKBaseTest.java | 46 +++++++++
8 files changed, 229 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/e049d653/subprojects/s4-admin/s4-admin.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-admin/s4-admin.gradle b/subprojects/s4-admin/s4-admin.gradle
deleted file mode 100644
index fa0c3c7..0000000
--- a/subprojects/s4-admin/s4-admin.gradle
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Copyright 2010 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-description = 'Apis for adminstration of S4 cluster.'
-
-dependencies {
- compile project(":s4-base")
- compile project(":s4-comm")
- compile libraries.gson
- compile libraries.zkclient
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/e049d653/subprojects/s4-admin/src/main/java/org/apache/s4/admin/TaskSetup.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-admin/src/main/java/org/apache/s4/admin/TaskSetup.java b/subprojects/s4-admin/src/main/java/org/apache/s4/admin/TaskSetup.java
deleted file mode 100644
index 5b71120..0000000
--- a/subprojects/s4-admin/src/main/java/org/apache/s4/admin/TaskSetup.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.s4.admin;
-
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.ClusterNode;
-import org.apache.s4.comm.topology.TopologyFromZK;
-import org.apache.s4.comm.topology.ZNRecord;
-import org.apache.s4.comm.topology.ZNRecordSerializer;
-import org.apache.s4.comm.topology.ZkClient;
-
-public class TaskSetup {
-
- private ZkClient zkclient;
-
- public TaskSetup(String zookeeperAddress) {
- zkclient = new ZkClient(zookeeperAddress);
- zkclient.setZkSerializer(new ZNRecordSerializer());
- zkclient.waitUntilConnected();
- }
-
- public void clean(String clusterName) {
- zkclient.deleteRecursive("/" + clusterName);
- }
-
- public void setup(String clusterName, int tasks) {
- zkclient.createPersistent("/" + clusterName + "/tasks", true);
- zkclient.createPersistent("/" + clusterName + "/process", true);
- zkclient.createPersistent("/" + clusterName, true);
- for (int i = 0; i < tasks; i++) {
- String taskId = "Task-" + i;
- ZNRecord record = new ZNRecord(taskId);
- record.setSimpleField("taskId", taskId);
- record.setSimpleField("port", String.valueOf(1300 + i));
- record.setSimpleField("partition", String.valueOf(i));
- record.setSimpleField("cluster", clusterName);
- zkclient.createPersistent("/" + clusterName + "/tasks/" + taskId,
- record);
- }
- }
-
- public static void main(String[] args) throws Exception {
- TaskSetup taskSetup = new TaskSetup("localhost:2181");
- String clusterName = "test-s4-cluster";
- taskSetup.clean(clusterName);
- taskSetup.setup(clusterName, 10);
- String zookeeperAddress = "localhost:2181";
- for (int i = 0; i < 10; i++) {
- AssignmentFromZK assignmentFromZK = new AssignmentFromZK(
- clusterName, zookeeperAddress, 30000, 30000);
- ClusterNode assignClusterNode = assignmentFromZK
- .assignClusterNode();
- System.out.println(i+"-->"+assignClusterNode);
- }
- TopologyFromZK topologyFromZK=new TopologyFromZK(clusterName, zookeeperAddress, 30000, 30000);
- Thread.sleep(3000);
- Cluster topology = topologyFromZK.getTopology();
- System.out.println(topology);
- Thread.currentThread().join();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/e049d653/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
new file mode 100644
index 0000000..15dff99
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
@@ -0,0 +1,60 @@
+package org.apache.s4.comm.tools;
+
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.ClusterNode;
+import org.apache.s4.comm.topology.TopologyFromZK;
+import org.apache.s4.comm.topology.ZNRecord;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
+
+public class TaskSetup {
+
+ private ZkClient zkclient;
+
+ public TaskSetup(String zookeeperAddress) {
+ zkclient = new ZkClient(zookeeperAddress);
+ zkclient.setZkSerializer(new ZNRecordSerializer());
+ zkclient.waitUntilConnected();
+ }
+
+ public void clean(String clusterName) {
+ zkclient.deleteRecursive("/" + clusterName);
+ }
+
+ public void setup(String clusterName, int tasks) {
+ zkclient.createPersistent("/" + clusterName + "/tasks", true);
+ zkclient.createPersistent("/" + clusterName + "/process", true);
+ zkclient.createPersistent("/" + clusterName, true);
+ for (int i = 0; i < tasks; i++) {
+ String taskId = "Task-" + i;
+ ZNRecord record = new ZNRecord(taskId);
+ record.setSimpleField("taskId", taskId);
+ record.setSimpleField("port", String.valueOf(1300 + i));
+ record.setSimpleField("partition", String.valueOf(i));
+ record.setSimpleField("cluster", clusterName);
+ zkclient.createPersistent("/" + clusterName + "/tasks/" + taskId,
+ record);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ TaskSetup taskSetup = new TaskSetup("localhost:2181");
+ String clusterName = "test-s4-cluster";
+ taskSetup.clean(clusterName);
+ taskSetup.setup(clusterName, 10);
+ String zookeeperAddress = "localhost:2181";
+ for (int i = 0; i < 10; i++) {
+ AssignmentFromZK assignmentFromZK = new AssignmentFromZK(
+ clusterName, zookeeperAddress, 30000, 30000);
+ ClusterNode assignClusterNode = assignmentFromZK
+ .assignClusterNode();
+ System.out.println(i+"-->"+assignClusterNode);
+ }
+ TopologyFromZK topologyFromZK=new TopologyFromZK(clusterName, zookeeperAddress, 30000, 30000);
+ Thread.sleep(3000);
+ Cluster topology = topologyFromZK.getTopology();
+ System.out.println(topology.getNodes().size());
+ Thread.currentThread().join();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/e049d653/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
index 6fdb85f..3f03105 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
@@ -179,8 +179,9 @@ public class AssignmentFromZK implements Assignment, IZkChildListener,
process);
} catch (Throwable e) {
- logger.warn("Exception trying to acquire task. This is warning and can be ignored"
- + e);
+ logger.warn("Exception trying to acquire task:"
+ + taskName
+ + " This is warning and can be ignored. " + e);
// Any exception does not means we failed to acquire
// task because we might have acquired task but there
// was ZK connection loss
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/e049d653/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java
index d9f2874..009ce1f 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java
@@ -79,12 +79,12 @@ public class TopologyFromZK implements Topology, IZkChildListener,
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
-
+ doProcess();
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
-
+ doProcess();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/e049d653/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
new file mode 100644
index 0000000..f7ffbdb
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
@@ -0,0 +1,40 @@
+package org.apache.s4.comm.topology;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.comm.tools.TaskSetup;
+
+public class AssignmentFromZKTest extends ZKBaseTest {
+
+ public void testAssignment() throws Exception {
+ TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
+ final String clusterName = "test-s4-cluster";
+ taskSetup.clean(clusterName);
+ taskSetup.setup(clusterName, 10);
+ final CountDownLatch latch = new CountDownLatch(10);
+ for (int i = 0; i < 10; i++) {
+ Runnable runnable = new Runnable() {
+
+ @Override
+ public void run() {
+ AssignmentFromZK assignmentFromZK;
+ try {
+ assignmentFromZK = new AssignmentFromZK(clusterName,
+ zookeeperAddress, 30000, 30000);
+ ClusterNode assignClusterNode = assignmentFromZK
+ .assignClusterNode();
+ latch.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ Thread t = new Thread(runnable);
+ t.start();
+ }
+
+ boolean await = latch.await(3, TimeUnit.SECONDS);
+ assertEquals(true, await);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/e049d653/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
new file mode 100644
index 0000000..5a23487
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
@@ -0,0 +1,78 @@
+package org.apache.s4.comm.topology;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.s4.comm.tools.TaskSetup;
+
+public class TopologyFromZKTest extends ZKBaseTest {
+
+ public void testAssignment() throws Exception {
+ TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
+ final String clusterName = "test-s4-cluster";
+ taskSetup.clean(clusterName);
+ taskSetup.setup(clusterName, 10);
+
+ final TopologyFromZK topologyFromZK = new TopologyFromZK(clusterName,
+ zookeeperAddress, 30000, 30000);
+ final Lock lock = new ReentrantLock();
+ final Condition signal = lock.newCondition();
+ TopologyChangeListener listener = new TopologyChangeListener() {
+
+ @Override
+ public void onChange() {
+ System.out
+ .println("TopologyFromZKTest.testAssignment().new TopologyChangeListener() {...}.onChange()");
+ if (topologyFromZK.getTopology().getNodes().size() == 10) {
+ lock.lock();
+ try {
+ signal.signalAll();
+ } finally {
+ lock.unlock();
+ }
+
+ }
+
+ }
+ };
+ topologyFromZK.addListener(listener);
+ final CountDownLatch latch = new CountDownLatch(10);
+ for (int i = 0; i < 10; i++) {
+ Runnable runnable = new Runnable() {
+
+ @Override
+ public void run() {
+ AssignmentFromZK assignmentFromZK;
+ try {
+ assignmentFromZK = new AssignmentFromZK(clusterName,
+ zookeeperAddress, 30000, 30000);
+ ClusterNode assignClusterNode = assignmentFromZK
+ .assignClusterNode();
+ latch.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ Thread t = new Thread(runnable);
+ t.start();
+ }
+
+ boolean await = latch.await(3, TimeUnit.SECONDS);
+ assertEquals(true, await);
+ boolean success = false;
+ lock.lock();
+ try {
+ success = signal.await(3, TimeUnit.SECONDS);
+ } finally {
+ lock.unlock();
+ }
+ assertEquals(true, success);
+ assertEquals(10, topologyFromZK.getTopology().getNodes().size());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/e049d653/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
new file mode 100644
index 0000000..b80d0b5
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
@@ -0,0 +1,46 @@
+package org.apache.s4.comm.topology;
+
+import java.io.File;
+
+import junit.framework.TestCase;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+
+public class ZKBaseTest extends TestCase {
+ protected ZkServer zkServer = null;
+ protected ZkClient zkClient;
+ protected String zookeeperAddress;
+
+ @Override
+ public void setUp() {
+ String dataDir = System.getProperty("user.dir") + File.separator
+ + "tmp" + File.separator + "zookeeper" + File.separator
+ + "data";
+ String logDir = System.getProperty("user.dir") + File.separator + "tmp"
+ + File.separator + "zookeeper" + File.separator + "logs";
+ IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+
+ @Override
+ public void createDefaultNameSpace(ZkClient zkClient) {
+
+ }
+ };
+ int port = 3029;
+ zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
+ zkServer.start();
+ zkClient = zkServer.getZkClient();
+ zookeeperAddress = "localhost:" + port;
+
+ }
+ public void test(){
+
+ }
+ @Override
+ protected void tearDown() throws Exception {
+ if (zkServer != null) {
+ zkServer.shutdown();
+ }
+ }
+}