You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by df...@apache.org on 2013/02/22 17:56:14 UTC

[3/3] git commit: S4 nodes will pickup the first avaialbe instance name, no need to specify it

Updated Branches:
  refs/heads/S4-110-new 0b93cae4b -> 59bcb94cc


S4 nodes will pickup the first avaialbe instance name, no need to specify it


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/59bcb94c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/59bcb94c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/59bcb94c

Branch: refs/heads/S4-110-new
Commit: 59bcb94cc727cb1600074e6da81330953aeaa566
Parents: 88b0acf
Author: Daniel Gómez Ferro <da...@yahoo-inc.com>
Authored: Fri Feb 22 18:55:20 2013 +0100
Committer: Daniel Gómez Ferro <da...@yahoo-inc.com>
Committed: Fri Feb 22 18:55:20 2013 +0100

----------------------------------------------------------------------
 .../s4/comm/topology/AssignmentFromHelix.java      |   46 +++++++++------
 .../main/java/org/apache/s4/core/BaseModule.java   |   18 +-----
 .../java/org/apache/s4/core/S4HelixBootstrap.java  |   27 +++++----
 .../src/main/java/org/apache/s4/core/S4Node.java   |    5 +-
 .../core/moduleloader/ModuleLoaderTestUtils.java   |    2 +-
 .../java/org/apache/s4/fixtures/CoreTestUtils.java |    3 +-
 .../org/apache/s4/wordcount/WordCountTest.java     |    2 +-
 7 files changed, 47 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59bcb94c/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
index 243faac..96fb88c 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
@@ -11,14 +11,12 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -26,6 +24,7 @@ import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.s4.comm.helix.S4HelixConstants;
 import org.apache.s4.comm.helix.TaskStateModelFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,7 +54,7 @@ public class AssignmentFromHelix implements Assignment {
 
     @Inject
     public AssignmentFromHelix(@Named("s4.cluster.name") String clusterName,
-            @Named("s4.instance.name") String instanceName, @Named("s4.cluster.zk_address") String zookeeperAddress)
+            @Named("s4.cluster.zk_address") String zookeeperAddress)
             throws Exception {
         this.taskStateModelFactory = new TaskStateModelFactory();
         // this.appStateModelFactory = appStateModelFactory;
@@ -67,7 +66,7 @@ public class AssignmentFromHelix implements Assignment {
         zkClient.setZkSerializer(new ZNRecordSerializer());
         zkClient.waitUntilConnected(60, TimeUnit.SECONDS);
         BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
-        helixDataAccessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor);
+        helixDataAccessor = new ZKHelixDataAccessor(S4HelixConstants.HELIX_CLUSTER_NAME, baseDataAccessor);
         clusterNodeRef = new AtomicReference<ClusterNode>();
         taskAcquired = lock.newCondition();
         currentlyOwningTask = new AtomicBoolean(true);
@@ -77,14 +76,11 @@ public class AssignmentFromHelix implements Assignment {
             logger.warn("Unable to get hostname", e);
             machineId = "UNKNOWN";
         }
-        ClusterNode node = new ClusterNode(-1, Integer.parseInt(instanceName.split("_")[1]), machineId, instanceName);
-        clusterNodeRef.set(node);
-        currentlyOwningTask.set(true);
     }
 
     @Inject
     public void init() {
-        // joinCluster();
+         joinCluster();
     }
 
     @Override
@@ -103,7 +99,7 @@ public class AssignmentFromHelix implements Assignment {
         return clusterNodeRef.get();
     }
 
-    public void joinClusterOld() {
+    public void joinCluster() {
         lock.lock();
         try {
             Builder keyBuilder = helixDataAccessor.keyBuilder();
@@ -112,25 +108,33 @@ public class AssignmentFromHelix implements Assignment {
                 List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
                 for (InstanceConfig instanceConfig : instances) {
                     String instanceName = instanceConfig.getInstanceName();
-                    if (!liveInstances.contains(instanceName)) {
-                        zkHelixManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
+                    if (liveInstances.contains(instanceName)) {
+                        continue;
+                    }
+                    String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
+                    if (!nodeGroup.equals(clusterName)) {
+                        continue;
+                    }
+
+                    try {
+                        zkHelixManager = HelixManagerFactory.getZKHelixManager(S4HelixConstants.HELIX_CLUSTER_NAME, instanceName,
                                 InstanceType.PARTICIPANT, zookeeperAddress);
                         zkHelixManager.getStateMachineEngine().registerStateModelFactory("LeaderStandby",
                                 taskStateModelFactory);
-
                         zkHelixManager.connect();
-                        ClusterNode node = new ClusterNode(-1, Integer.parseInt(instanceConfig.getPort()), machineId,
-                                instanceName);
+
+                        ClusterNode node = new ClusterNode(-1, Integer.parseInt(instanceName.split("_")[1]), machineId, instanceName);
                         clusterNodeRef.set(node);
                         currentlyOwningTask.set(true);
                         taskAcquired.signalAll();
-                        break;
+                        return;
+                    } catch (Exception e) {
+                        logger.error("Unexpected exception while trying to register with Helix, retrying", e);
                     }
                 }
-                if (instances.size() == liveInstances.size()) {
-                    System.out.println("No more nodes can join the cluster. Will wait for some node to die.");
-                    Thread.sleep(100000);
-                }
+                // TODO wait for notification from Helix instead
+                logger.warn("No more nodes can join the cluster. Will wait for some node to die.");
+                Thread.sleep(1000);
             } while (!currentlyOwningTask.get());
             System.out.println("Joined the cluster:" + clusterName + " as " + clusterNodeRef.get().getTaskId());
         } catch (Exception e) {
@@ -140,4 +144,8 @@ public class AssignmentFromHelix implements Assignment {
         }
     }
 
+    public HelixManager getHelixManager() {
+        return zkHelixManager;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59bcb94c/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
index b92c87b..a9604a7 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
@@ -6,6 +6,7 @@ import java.util.HashMap;
 import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.helix.HelixManager;
 import org.apache.s4.comm.helix.TaskStateModelFactory;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.AssignmentFromHelix;
@@ -35,14 +36,12 @@ public class BaseModule extends AbstractModule {
     private PropertiesConfiguration config;
     InputStream baseConfigInputStream;
     String clusterName;
-    private final String instanceName;
     boolean useHelix = false;
 
-    public BaseModule(InputStream baseConfigInputStream, String clusterName, String instanceName, boolean useHelix) {
+    public BaseModule(InputStream baseConfigInputStream, String clusterName, boolean useHelix) {
         super();
         this.baseConfigInputStream = baseConfigInputStream;
         this.clusterName = clusterName;
-        this.instanceName = instanceName;
         this.useHelix = useHelix;
     }
 
@@ -102,19 +101,6 @@ public class BaseModule extends AbstractModule {
                     });
                 }
             }
-            if (instanceName != null) {
-                if (config.containsKey("s4.instance.name")) {
-                    logger.warn(
-                            "instanceName [{}] passed as a parameter will not be used because an existing s4.instance.name parameter of value [{}] was found in the configuration file and will be used",
-                            instanceName, config.getProperty("s4.instance.name"));
-                } else {
-                    Names.bindProperties(binder, new HashMap<String, String>() {
-                        {
-                            put("s4.instance.name", instanceName);
-                        }
-                    });
-                }
-            }
 
         } catch (ConfigurationException e) {
             binder.addError(e);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59bcb94c/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
index 3d58dbc..7731c4e 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
@@ -2,19 +2,26 @@ package org.apache.s4.core;
 
 import java.net.Inet4Address;
 import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.spectator.RoutingTableProvider;
 import org.apache.s4.comm.helix.S4HelixConstants;
 import org.apache.s4.comm.helix.TaskStateModelFactory;
+import org.apache.s4.comm.topology.AssignmentFromHelix;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.util.ArchiveFetchException;
 import org.apache.s4.comm.util.ArchiveFetcher;
@@ -57,33 +64,30 @@ public class S4HelixBootstrap implements Bootstrap {
 
     private final String clusterName;
 
-    private final String instanceName;
+    private String instanceName;
 
     private final String zookeeperAddress;
-    private final TaskStateModelFactory taskStateModelFactory;
 
     private final AppStateModelFactory appStateModelFactory;
+    private final AssignmentFromHelix assignmentFromHelix;
 
     private final Cluster cluster;
 
-    private final Lock startingNode = new ReentrantLock();
-
     public static Injector rootInjector;
 
     @Inject
     public S4HelixBootstrap(@Named("s4.cluster.name") String clusterName,
-            @Named("s4.instance.name") String instanceName, @Named("s4.cluster.zk_address") String zookeeperAddress,
+            @Named("s4.cluster.zk_address") String zookeeperAddress,
             @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
             @Named("s4.cluster.zk_connection_timeout") int connectionTimeout,
-            AppStateModelFactory appStateModelFactory, TaskStateModelFactory taskStateModelFactory,
+            AppStateModelFactory appStateModelFactory, AssignmentFromHelix assignmentFromHelix,
             ArchiveFetcher fetcher, Cluster cluster) {
         this.clusterName = clusterName;
-        this.instanceName = instanceName;
         this.zookeeperAddress = zookeeperAddress;
-        this.taskStateModelFactory = taskStateModelFactory;
         this.appStateModelFactory = appStateModelFactory;
         this.fetcher = fetcher;
         this.cluster = cluster;
+        this.assignmentFromHelix = assignmentFromHelix;
     }
 
     @Override
@@ -96,19 +100,16 @@ public class S4HelixBootstrap implements Bootstrap {
                 HelixControllerMain.STANDALONE);
         // this.parentInjector = parentInjector;
         S4HelixBootstrap.rootInjector = parentInjector;
+        
         registerWithHelix();
 
         signalOneAppLoaded.await();
     }
 
     private void registerWithHelix() {
-        HelixManager helixManager;
+        HelixManager helixManager = assignmentFromHelix.getHelixManager();
         try {
-            helixManager = HelixManagerFactory.getZKHelixManager(S4HelixConstants.HELIX_CLUSTER_NAME, instanceName,
-                    InstanceType.PARTICIPANT, zookeeperAddress);
-            helixManager.getStateMachineEngine().registerStateModelFactory("LeaderStandby", taskStateModelFactory);
             helixManager.getStateMachineEngine().registerStateModelFactory("OnlineOffline", appStateModelFactory);
-            helixManager.connect();
             helixManager.addExternalViewChangeListener((RoutingTableProvider) cluster);
         } catch (Exception e) {
             // TODO Auto-generated catch block

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59bcb94c/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
index 1293daa..e815c80 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
@@ -50,7 +50,7 @@ public class S4Node {
 
         Injector injector = Guice
                 .createInjector(new Module[] { new BaseModule(Resources.getResource("default.s4.base.properties")
-                        .openStream(), nodeArgs.clusterName, nodeArgs.instanceName, nodeArgs.useHelix) });
+                        .openStream(), nodeArgs.clusterName, nodeArgs.useHelix) });
         Bootstrap bootstrap = injector.getInstance(Bootstrap.class);
         try {
             bootstrap.start(injector);
@@ -75,9 +75,6 @@ public class S4Node {
         @Parameter(names = "-zk", description = "Zookeeper connection string", required = false)
         String zkConnectionString;
 
-        @Parameter(names = { "-id", "-nodeId" }, description = "Node/Instance id that uniquely identifies a node", required = false)
-        String instanceName = null;
-
         @Parameter(names = "-helix", description = "Required flag when using a Helix based cluster manager", required = false, arity = 0)
         boolean useHelix = false;
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59bcb94c/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
index c0e5bb7..68ab42b 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
@@ -69,7 +69,7 @@ public class ModuleLoaderTestUtils {
         }
 
         Injector injector = Guice.createInjector(new BaseModule(Resources.getResource("default.s4.base.properties")
-                .openStream(), "cluster1", null, false),
+                .openStream(), "cluster1", false),
                 new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()));
 
         Emitter emitter = injector.getInstance(TCPEmitter.class);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59bcb94c/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index 19f1a26..1f054cb 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -115,8 +115,7 @@ public class CoreTestUtils extends CommTestUtils {
 
     public static Injector createInjectorWithNonFailFastZKClients() throws IOException {
         return Guice.createInjector(Modules.override(
-                new BaseModule(Resources.getResource("default.s4.base.properties").openStream(), "cluster1", null,
-                        false),
+                new BaseModule(Resources.getResource("default.s4.base.properties").openStream(), "cluster1", false),
                 new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()),
                 new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream())).with(
                 new NonFailFastZookeeperClientsModule()));

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59bcb94c/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index 69db25b..43c1651 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -60,7 +60,7 @@ public class WordCountTest extends ZkBasedTest {
 
     public void createEmitter() throws IOException {
         injector = Guice.createInjector(new BaseModule(
-                Resources.getResource("default.s4.base.properties").openStream(), "cluster1", null, false),
+                Resources.getResource("default.s4.base.properties").openStream(), "cluster1", false),
                 new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()),
                 new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));