You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 14:03:29 UTC

[45/50] [abbrv] git commit: Initial ZK integration with s4.

Initial ZK integration with s4.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/e049d653
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/e049d653
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/e049d653

Branch: refs/heads/piper
Commit: e049d653b88398e5dbe672feb51ac79c73114aff
Parents: 076e1bf
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Sat Oct 22 00:43:31 2011 -0700
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Sat Oct 22 00:43:31 2011 -0700

----------------------------------------------------------------------
 subprojects/s4-admin/s4-admin.gradle               |   24 -----
 .../main/java/org/apache/s4/admin/TaskSetup.java   |   60 -----------
 .../java/org/apache/s4/comm/tools/TaskSetup.java   |   60 +++++++++++
 .../apache/s4/comm/topology/AssignmentFromZK.java  |    5 +-
 .../apache/s4/comm/topology/TopologyFromZK.java    |    4 +-
 .../s4/comm/topology/AssignmentFromZKTest.java     |   40 ++++++++
 .../s4/comm/topology/TopologyFromZKTest.java       |   78 +++++++++++++++
 .../org/apache/s4/comm/topology/ZKBaseTest.java    |   46 +++++++++
 8 files changed, 229 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/e049d653/subprojects/s4-admin/s4-admin.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-admin/s4-admin.gradle b/subprojects/s4-admin/s4-admin.gradle
deleted file mode 100644
index fa0c3c7..0000000
--- a/subprojects/s4-admin/s4-admin.gradle
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Copyright 2010 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-description = 'Apis for adminstration of S4 cluster.'
- 
-dependencies {
-    compile project(":s4-base")
-    compile project(":s4-comm")
-    compile libraries.gson
-    compile libraries.zkclient
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/e049d653/subprojects/s4-admin/src/main/java/org/apache/s4/admin/TaskSetup.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-admin/src/main/java/org/apache/s4/admin/TaskSetup.java b/subprojects/s4-admin/src/main/java/org/apache/s4/admin/TaskSetup.java
deleted file mode 100644
index 5b71120..0000000
--- a/subprojects/s4-admin/src/main/java/org/apache/s4/admin/TaskSetup.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.s4.admin;
-
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.ClusterNode;
-import org.apache.s4.comm.topology.TopologyFromZK;
-import org.apache.s4.comm.topology.ZNRecord;
-import org.apache.s4.comm.topology.ZNRecordSerializer;
-import org.apache.s4.comm.topology.ZkClient;
-
-public class TaskSetup {
-
-    private ZkClient zkclient;
-
-    public TaskSetup(String zookeeperAddress) {
-        zkclient = new ZkClient(zookeeperAddress);
-        zkclient.setZkSerializer(new ZNRecordSerializer());
-        zkclient.waitUntilConnected();
-    }
-
-    public void clean(String clusterName) {
-        zkclient.deleteRecursive("/" + clusterName);
-    }
-
-    public void setup(String clusterName, int tasks) {
-        zkclient.createPersistent("/" + clusterName + "/tasks", true);
-        zkclient.createPersistent("/" + clusterName + "/process", true);
-        zkclient.createPersistent("/" + clusterName, true);
-        for (int i = 0; i < tasks; i++) {
-            String taskId = "Task-" + i;
-            ZNRecord record = new ZNRecord(taskId);
-            record.setSimpleField("taskId", taskId);
-            record.setSimpleField("port", String.valueOf(1300 + i));
-            record.setSimpleField("partition", String.valueOf(i));
-            record.setSimpleField("cluster", clusterName);
-            zkclient.createPersistent("/" + clusterName + "/tasks/" + taskId,
-                    record);
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        TaskSetup taskSetup = new TaskSetup("localhost:2181");
-        String clusterName = "test-s4-cluster";
-        taskSetup.clean(clusterName);
-        taskSetup.setup(clusterName, 10);
-        String zookeeperAddress = "localhost:2181";
-        for (int i = 0; i < 10; i++) {
-            AssignmentFromZK assignmentFromZK = new AssignmentFromZK(
-                    clusterName, zookeeperAddress, 30000, 30000);
-            ClusterNode assignClusterNode = assignmentFromZK
-                    .assignClusterNode();
-            System.out.println(i+"-->"+assignClusterNode);
-        }
-        TopologyFromZK topologyFromZK=new TopologyFromZK(clusterName, zookeeperAddress, 30000, 30000);
-        Thread.sleep(3000);
-        Cluster topology = topologyFromZK.getTopology();
-        System.out.println(topology);
-        Thread.currentThread().join();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/e049d653/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
new file mode 100644
index 0000000..15dff99
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
@@ -0,0 +1,60 @@
+package org.apache.s4.comm.tools;
+
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.ClusterNode;
+import org.apache.s4.comm.topology.TopologyFromZK;
+import org.apache.s4.comm.topology.ZNRecord;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
+
+public class TaskSetup {
+
+    private ZkClient zkclient;
+
+    public TaskSetup(String zookeeperAddress) {
+        zkclient = new ZkClient(zookeeperAddress);
+        zkclient.setZkSerializer(new ZNRecordSerializer());
+        zkclient.waitUntilConnected();
+    }
+
+    public void clean(String clusterName) {
+        zkclient.deleteRecursive("/" + clusterName);
+    }
+
+    public void setup(String clusterName, int tasks) {
+        zkclient.createPersistent("/" + clusterName + "/tasks", true);
+        zkclient.createPersistent("/" + clusterName + "/process", true);
+        zkclient.createPersistent("/" + clusterName, true);
+        for (int i = 0; i < tasks; i++) {
+            String taskId = "Task-" + i;
+            ZNRecord record = new ZNRecord(taskId);
+            record.setSimpleField("taskId", taskId);
+            record.setSimpleField("port", String.valueOf(1300 + i));
+            record.setSimpleField("partition", String.valueOf(i));
+            record.setSimpleField("cluster", clusterName);
+            zkclient.createPersistent("/" + clusterName + "/tasks/" + taskId,
+                    record);
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        TaskSetup taskSetup = new TaskSetup("localhost:2181");
+        String clusterName = "test-s4-cluster";
+        taskSetup.clean(clusterName);
+        taskSetup.setup(clusterName, 10);
+        String zookeeperAddress = "localhost:2181";
+        for (int i = 0; i < 10; i++) {
+            AssignmentFromZK assignmentFromZK = new AssignmentFromZK(
+                    clusterName, zookeeperAddress, 30000, 30000);
+            ClusterNode assignClusterNode = assignmentFromZK
+                    .assignClusterNode();
+            System.out.println(i+"-->"+assignClusterNode);
+        }
+        TopologyFromZK topologyFromZK=new TopologyFromZK(clusterName, zookeeperAddress, 30000, 30000);
+        Thread.sleep(3000);
+        Cluster topology = topologyFromZK.getTopology();
+        System.out.println(topology.getNodes().size());
+        Thread.currentThread().join();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/e049d653/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
index 6fdb85f..3f03105 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
@@ -179,8 +179,9 @@ public class AssignmentFromZK implements Assignment, IZkChildListener,
                                 process);
 
                     } catch (Throwable e) {
-                        logger.warn("Exception trying to acquire task. This is warning and can be ignored"
-                                + e);
+                        logger.warn("Exception trying to acquire task:"
+                                + taskName
+                                + " This is warning and can be ignored. " + e);
                         // Any exception does not means we failed to acquire
                         // task because we might have acquired task but there
                         // was ZK connection loss

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/e049d653/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java
index d9f2874..009ce1f 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java
@@ -79,12 +79,12 @@ public class TopologyFromZK implements Topology, IZkChildListener,
 
     @Override
     public void handleDataChange(String dataPath, Object data) throws Exception {
-
+        doProcess();
     }
 
     @Override
     public void handleDataDeleted(String dataPath) throws Exception {
-
+        doProcess();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/e049d653/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
new file mode 100644
index 0000000..f7ffbdb
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
@@ -0,0 +1,40 @@
+package org.apache.s4.comm.topology;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.comm.tools.TaskSetup;
+
+public class AssignmentFromZKTest extends ZKBaseTest {
+
+    public void testAssignment() throws Exception {
+        TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
+        final String clusterName = "test-s4-cluster";
+        taskSetup.clean(clusterName);
+        taskSetup.setup(clusterName, 10);
+        final CountDownLatch latch = new CountDownLatch(10);
+        for (int i = 0; i < 10; i++) {
+            Runnable runnable = new Runnable() {
+
+                @Override
+                public void run() {
+                    AssignmentFromZK assignmentFromZK;
+                    try {
+                        assignmentFromZK = new AssignmentFromZK(clusterName,
+                                zookeeperAddress, 30000, 30000);
+                        ClusterNode assignClusterNode = assignmentFromZK
+                                .assignClusterNode();
+                        latch.countDown();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+            Thread t = new Thread(runnable);
+            t.start();
+        }
+
+        boolean await = latch.await(3, TimeUnit.SECONDS);
+        assertEquals(true, await);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/e049d653/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
new file mode 100644
index 0000000..5a23487
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
@@ -0,0 +1,78 @@
+package org.apache.s4.comm.topology;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.s4.comm.tools.TaskSetup;
+
+public class TopologyFromZKTest extends ZKBaseTest {
+
+    public void testAssignment() throws Exception {
+        TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
+        final String clusterName = "test-s4-cluster";
+        taskSetup.clean(clusterName);
+        taskSetup.setup(clusterName, 10);
+
+        final TopologyFromZK topologyFromZK = new TopologyFromZK(clusterName,
+                zookeeperAddress, 30000, 30000);
+        final Lock lock = new ReentrantLock();
+        final Condition signal = lock.newCondition();
+        TopologyChangeListener listener = new TopologyChangeListener() {
+
+            @Override
+            public void onChange() {
+                System.out
+                        .println("TopologyFromZKTest.testAssignment().new TopologyChangeListener() {...}.onChange()");
+                if (topologyFromZK.getTopology().getNodes().size() == 10) {
+                    lock.lock();
+                    try {
+                        signal.signalAll();
+                    } finally {
+                        lock.unlock();
+                    }
+
+                }
+
+            }
+        };
+        topologyFromZK.addListener(listener);
+        final CountDownLatch latch = new CountDownLatch(10);
+        for (int i = 0; i < 10; i++) {
+            Runnable runnable = new Runnable() {
+
+                @Override
+                public void run() {
+                    AssignmentFromZK assignmentFromZK;
+                    try {
+                        assignmentFromZK = new AssignmentFromZK(clusterName,
+                                zookeeperAddress, 30000, 30000);
+                        ClusterNode assignClusterNode = assignmentFromZK
+                                .assignClusterNode();
+                        latch.countDown();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+            Thread t = new Thread(runnable);
+            t.start();
+        }
+
+        boolean await = latch.await(3, TimeUnit.SECONDS);
+        assertEquals(true, await);
+        boolean success = false;
+        lock.lock();
+        try {
+            success = signal.await(3, TimeUnit.SECONDS);
+        } finally {
+            lock.unlock();
+        }
+        assertEquals(true, success);
+        assertEquals(10, topologyFromZK.getTopology().getNodes().size());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/e049d653/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
new file mode 100644
index 0000000..b80d0b5
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
@@ -0,0 +1,46 @@
+package org.apache.s4.comm.topology;
+
+import java.io.File;
+
+import junit.framework.TestCase;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+
+public class ZKBaseTest extends TestCase {
+    protected ZkServer zkServer = null;
+    protected ZkClient zkClient;
+    protected String zookeeperAddress;
+
+    @Override
+    public void setUp() {
+        String dataDir = System.getProperty("user.dir") + File.separator
+                + "tmp" + File.separator + "zookeeper" + File.separator
+                + "data";
+        String logDir = System.getProperty("user.dir") + File.separator + "tmp"
+                + File.separator + "zookeeper" + File.separator + "logs";
+        IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+
+            @Override
+            public void createDefaultNameSpace(ZkClient zkClient) {
+
+            }
+        };
+        int port = 3029;
+        zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
+        zkServer.start();
+        zkClient = zkServer.getZkClient();
+        zookeeperAddress = "localhost:" + port;
+
+    }
+    public void test(){
+        
+    }
+    @Override
+    protected void tearDown() throws Exception {
+        if (zkServer != null) {
+            zkServer.shutdown();
+        }
+    }
+}