You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by df...@apache.org on 2013/02/22 17:56:14 UTC
[3/3] git commit: S4 nodes will pickup the first avaialbe instance
name, no need to specify it
Updated Branches:
refs/heads/S4-110-new 0b93cae4b -> 59bcb94cc
S4 nodes will pickup the first avaialbe instance name, no need to specify it
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/59bcb94c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/59bcb94c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/59bcb94c
Branch: refs/heads/S4-110-new
Commit: 59bcb94cc727cb1600074e6da81330953aeaa566
Parents: 88b0acf
Author: Daniel Gómez Ferro <da...@yahoo-inc.com>
Authored: Fri Feb 22 18:55:20 2013 +0100
Committer: Daniel Gómez Ferro <da...@yahoo-inc.com>
Committed: Fri Feb 22 18:55:20 2013 +0100
----------------------------------------------------------------------
.../s4/comm/topology/AssignmentFromHelix.java | 46 +++++++++------
.../main/java/org/apache/s4/core/BaseModule.java | 18 +-----
.../java/org/apache/s4/core/S4HelixBootstrap.java | 27 +++++----
.../src/main/java/org/apache/s4/core/S4Node.java | 5 +-
.../core/moduleloader/ModuleLoaderTestUtils.java | 2 +-
.../java/org/apache/s4/fixtures/CoreTestUtils.java | 3 +-
.../org/apache/s4/wordcount/WordCountTest.java | 2 +-
7 files changed, 47 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59bcb94c/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
index 243faac..96fb88c 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
@@ -11,14 +11,12 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -26,6 +24,7 @@ import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.s4.comm.helix.S4HelixConstants;
import org.apache.s4.comm.helix.TaskStateModelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +54,7 @@ public class AssignmentFromHelix implements Assignment {
@Inject
public AssignmentFromHelix(@Named("s4.cluster.name") String clusterName,
- @Named("s4.instance.name") String instanceName, @Named("s4.cluster.zk_address") String zookeeperAddress)
+ @Named("s4.cluster.zk_address") String zookeeperAddress)
throws Exception {
this.taskStateModelFactory = new TaskStateModelFactory();
// this.appStateModelFactory = appStateModelFactory;
@@ -67,7 +66,7 @@ public class AssignmentFromHelix implements Assignment {
zkClient.setZkSerializer(new ZNRecordSerializer());
zkClient.waitUntilConnected(60, TimeUnit.SECONDS);
BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
- helixDataAccessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor);
+ helixDataAccessor = new ZKHelixDataAccessor(S4HelixConstants.HELIX_CLUSTER_NAME, baseDataAccessor);
clusterNodeRef = new AtomicReference<ClusterNode>();
taskAcquired = lock.newCondition();
currentlyOwningTask = new AtomicBoolean(true);
@@ -77,14 +76,11 @@ public class AssignmentFromHelix implements Assignment {
logger.warn("Unable to get hostname", e);
machineId = "UNKNOWN";
}
- ClusterNode node = new ClusterNode(-1, Integer.parseInt(instanceName.split("_")[1]), machineId, instanceName);
- clusterNodeRef.set(node);
- currentlyOwningTask.set(true);
}
@Inject
public void init() {
- // joinCluster();
+ joinCluster();
}
@Override
@@ -103,7 +99,7 @@ public class AssignmentFromHelix implements Assignment {
return clusterNodeRef.get();
}
- public void joinClusterOld() {
+ public void joinCluster() {
lock.lock();
try {
Builder keyBuilder = helixDataAccessor.keyBuilder();
@@ -112,25 +108,33 @@ public class AssignmentFromHelix implements Assignment {
List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
for (InstanceConfig instanceConfig : instances) {
String instanceName = instanceConfig.getInstanceName();
- if (!liveInstances.contains(instanceName)) {
- zkHelixManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
+ if (liveInstances.contains(instanceName)) {
+ continue;
+ }
+ String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
+ if (!nodeGroup.equals(clusterName)) {
+ continue;
+ }
+
+ try {
+ zkHelixManager = HelixManagerFactory.getZKHelixManager(S4HelixConstants.HELIX_CLUSTER_NAME, instanceName,
InstanceType.PARTICIPANT, zookeeperAddress);
zkHelixManager.getStateMachineEngine().registerStateModelFactory("LeaderStandby",
taskStateModelFactory);
-
zkHelixManager.connect();
- ClusterNode node = new ClusterNode(-1, Integer.parseInt(instanceConfig.getPort()), machineId,
- instanceName);
+
+ ClusterNode node = new ClusterNode(-1, Integer.parseInt(instanceName.split("_")[1]), machineId, instanceName);
clusterNodeRef.set(node);
currentlyOwningTask.set(true);
taskAcquired.signalAll();
- break;
+ return;
+ } catch (Exception e) {
+ logger.error("Unexpected exception while trying to register with Helix, retrying", e);
}
}
- if (instances.size() == liveInstances.size()) {
- System.out.println("No more nodes can join the cluster. Will wait for some node to die.");
- Thread.sleep(100000);
- }
+ // TODO wait for notification from Helix instead
+ logger.warn("No more nodes can join the cluster. Will wait for some node to die.");
+ Thread.sleep(1000);
} while (!currentlyOwningTask.get());
System.out.println("Joined the cluster:" + clusterName + " as " + clusterNodeRef.get().getTaskId());
} catch (Exception e) {
@@ -140,4 +144,8 @@ public class AssignmentFromHelix implements Assignment {
}
}
+ public HelixManager getHelixManager() {
+ return zkHelixManager;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59bcb94c/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
index b92c87b..a9604a7 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
@@ -6,6 +6,7 @@ import java.util.HashMap;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.helix.HelixManager;
import org.apache.s4.comm.helix.TaskStateModelFactory;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.AssignmentFromHelix;
@@ -35,14 +36,12 @@ public class BaseModule extends AbstractModule {
private PropertiesConfiguration config;
InputStream baseConfigInputStream;
String clusterName;
- private final String instanceName;
boolean useHelix = false;
- public BaseModule(InputStream baseConfigInputStream, String clusterName, String instanceName, boolean useHelix) {
+ public BaseModule(InputStream baseConfigInputStream, String clusterName, boolean useHelix) {
super();
this.baseConfigInputStream = baseConfigInputStream;
this.clusterName = clusterName;
- this.instanceName = instanceName;
this.useHelix = useHelix;
}
@@ -102,19 +101,6 @@ public class BaseModule extends AbstractModule {
});
}
}
- if (instanceName != null) {
- if (config.containsKey("s4.instance.name")) {
- logger.warn(
- "instanceName [{}] passed as a parameter will not be used because an existing s4.instance.name parameter of value [{}] was found in the configuration file and will be used",
- instanceName, config.getProperty("s4.instance.name"));
- } else {
- Names.bindProperties(binder, new HashMap<String, String>() {
- {
- put("s4.instance.name", instanceName);
- }
- });
- }
- }
} catch (ConfigurationException e) {
binder.addError(e);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59bcb94c/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
index 3d58dbc..7731c4e 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
@@ -2,19 +2,26 @@ package org.apache.s4.core;
import java.net.Inet4Address;
import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.model.InstanceConfig;
import org.apache.helix.spectator.RoutingTableProvider;
import org.apache.s4.comm.helix.S4HelixConstants;
import org.apache.s4.comm.helix.TaskStateModelFactory;
+import org.apache.s4.comm.topology.AssignmentFromHelix;
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.util.ArchiveFetchException;
import org.apache.s4.comm.util.ArchiveFetcher;
@@ -57,33 +64,30 @@ public class S4HelixBootstrap implements Bootstrap {
private final String clusterName;
- private final String instanceName;
+ private String instanceName;
private final String zookeeperAddress;
- private final TaskStateModelFactory taskStateModelFactory;
private final AppStateModelFactory appStateModelFactory;
+ private final AssignmentFromHelix assignmentFromHelix;
private final Cluster cluster;
- private final Lock startingNode = new ReentrantLock();
-
public static Injector rootInjector;
@Inject
public S4HelixBootstrap(@Named("s4.cluster.name") String clusterName,
- @Named("s4.instance.name") String instanceName, @Named("s4.cluster.zk_address") String zookeeperAddress,
+ @Named("s4.cluster.zk_address") String zookeeperAddress,
@Named("s4.cluster.zk_session_timeout") int sessionTimeout,
@Named("s4.cluster.zk_connection_timeout") int connectionTimeout,
- AppStateModelFactory appStateModelFactory, TaskStateModelFactory taskStateModelFactory,
+ AppStateModelFactory appStateModelFactory, AssignmentFromHelix assignmentFromHelix,
ArchiveFetcher fetcher, Cluster cluster) {
this.clusterName = clusterName;
- this.instanceName = instanceName;
this.zookeeperAddress = zookeeperAddress;
- this.taskStateModelFactory = taskStateModelFactory;
this.appStateModelFactory = appStateModelFactory;
this.fetcher = fetcher;
this.cluster = cluster;
+ this.assignmentFromHelix = assignmentFromHelix;
}
@Override
@@ -96,19 +100,16 @@ public class S4HelixBootstrap implements Bootstrap {
HelixControllerMain.STANDALONE);
// this.parentInjector = parentInjector;
S4HelixBootstrap.rootInjector = parentInjector;
+
registerWithHelix();
signalOneAppLoaded.await();
}
private void registerWithHelix() {
- HelixManager helixManager;
+ HelixManager helixManager = assignmentFromHelix.getHelixManager();
try {
- helixManager = HelixManagerFactory.getZKHelixManager(S4HelixConstants.HELIX_CLUSTER_NAME, instanceName,
- InstanceType.PARTICIPANT, zookeeperAddress);
- helixManager.getStateMachineEngine().registerStateModelFactory("LeaderStandby", taskStateModelFactory);
helixManager.getStateMachineEngine().registerStateModelFactory("OnlineOffline", appStateModelFactory);
- helixManager.connect();
helixManager.addExternalViewChangeListener((RoutingTableProvider) cluster);
} catch (Exception e) {
// TODO Auto-generated catch block
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59bcb94c/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
index 1293daa..e815c80 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
@@ -50,7 +50,7 @@ public class S4Node {
Injector injector = Guice
.createInjector(new Module[] { new BaseModule(Resources.getResource("default.s4.base.properties")
- .openStream(), nodeArgs.clusterName, nodeArgs.instanceName, nodeArgs.useHelix) });
+ .openStream(), nodeArgs.clusterName, nodeArgs.useHelix) });
Bootstrap bootstrap = injector.getInstance(Bootstrap.class);
try {
bootstrap.start(injector);
@@ -75,9 +75,6 @@ public class S4Node {
@Parameter(names = "-zk", description = "Zookeeper connection string", required = false)
String zkConnectionString;
- @Parameter(names = { "-id", "-nodeId" }, description = "Node/Instance id that uniquely identifies a node", required = false)
- String instanceName = null;
-
@Parameter(names = "-helix", description = "Required flag when using a Helix based cluster manager", required = false, arity = 0)
boolean useHelix = false;
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59bcb94c/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
index c0e5bb7..68ab42b 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
@@ -69,7 +69,7 @@ public class ModuleLoaderTestUtils {
}
Injector injector = Guice.createInjector(new BaseModule(Resources.getResource("default.s4.base.properties")
- .openStream(), "cluster1", null, false),
+ .openStream(), "cluster1", false),
new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()));
Emitter emitter = injector.getInstance(TCPEmitter.class);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59bcb94c/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index 19f1a26..1f054cb 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -115,8 +115,7 @@ public class CoreTestUtils extends CommTestUtils {
public static Injector createInjectorWithNonFailFastZKClients() throws IOException {
return Guice.createInjector(Modules.override(
- new BaseModule(Resources.getResource("default.s4.base.properties").openStream(), "cluster1", null,
- false),
+ new BaseModule(Resources.getResource("default.s4.base.properties").openStream(), "cluster1", false),
new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()),
new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream())).with(
new NonFailFastZookeeperClientsModule()));
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59bcb94c/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index 69db25b..43c1651 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -60,7 +60,7 @@ public class WordCountTest extends ZkBasedTest {
public void createEmitter() throws IOException {
injector = Guice.createInjector(new BaseModule(
- Resources.getResource("default.s4.base.properties").openStream(), "cluster1", null, false),
+ Resources.getResource("default.s4.base.properties").openStream(), "cluster1", false),
new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()),
new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));