You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/11/12 02:18:59 UTC
[1/2] git commit: [HELIX-296] HelixConnection in 0.7.0 does not
remove LiveInstance znode
Updated Branches:
refs/heads/master 55c935169 -> 7c444e506
[HELIX-296] HelixConnection in 0.7.0 does not remove LiveInstance znode
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/72d879ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/72d879ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/72d879ed
Branch: refs/heads/master
Commit: 72d879edfced69f5ca9af6585592caa891d2b0c7
Parents: ec36112
Author: zzhang <zz...@apache.org>
Authored: Mon Nov 11 17:18:27 2013 -0800
Committer: zzhang <zz...@apache.org>
Committed: Mon Nov 11 17:18:27 2013 -0800
----------------------------------------------------------------------
.../org/apache/helix/HelixAutoController.java | 19 +
.../java/org/apache/helix/HelixConnection.java | 27 ++
.../helix/HelixConnectionStateListener.java | 19 +
.../java/org/apache/helix/HelixController.java | 19 +
.../java/org/apache/helix/HelixParticipant.java | 19 +
.../main/java/org/apache/helix/HelixRole.java | 19 +
.../java/org/apache/helix/HelixService.java | 19 +
.../manager/zk/HelixConnectionAdaptor.java | 19 +
.../helix/manager/zk/ZkCallbackHandler.java | 460 +++++++++++++++++++
.../org/apache/helix/manager/zk/ZkClient.java | 7 +-
.../helix/manager/zk/ZkHelixAutoController.java | 19 +
.../helix/manager/zk/ZkHelixConnection.java | 56 ++-
.../helix/manager/zk/ZkHelixController.java | 19 +
.../helix/manager/zk/ZkHelixLeaderElection.java | 37 +-
.../helix/manager/zk/ZkHelixParticipant.java | 23 +
.../apache/helix/monitoring/StatusDumpTask.java | 19 +
.../statemachine/HelixStateModelFactory.java | 19 +
.../HelixStateModelFactoryAdaptor.java | 19 +
.../helix/integration/TestHelixConnection.java | 20 +
.../manager/zk/TestZkHelixAutoController.java | 120 +++++
.../helix/manager/zk/TestZkHelixController.java | 160 +++++++
.../manager/zk/TestZkHelixParticipant.java | 103 +++++
.../helix/examples/LogicalModelExample.java | 3 +-
23 files changed, 1222 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAutoController.java b/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
index 7ad9218..91ec809 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
@@ -1,5 +1,24 @@
package org.apache.helix;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import org.apache.helix.api.id.ControllerId;
import org.apache.helix.participant.StateMachineEngine;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/HelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixConnection.java b/helix-core/src/main/java/org/apache/helix/HelixConnection.java
index 7551673..9d93c97 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixConnection.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixConnection.java
@@ -1,5 +1,24 @@
package org.apache.helix;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import org.apache.helix.api.accessor.ClusterAccessor;
import org.apache.helix.api.accessor.ParticipantAccessor;
import org.apache.helix.api.accessor.ResourceAccessor;
@@ -61,6 +80,14 @@ public interface HelixConnection {
HelixController createController(ClusterId clusterId, ControllerId controllerId);
/**
+ * create an autonomous helix-controller
+ * @param clusterId
+ * @param controllerId
+ * @return
+ */
+ HelixAutoController createAutoController(ClusterId clusterId, ControllerId controllerId);
+
+ /**
* create a cluster-accessor
* @param clusterId
* @return cluster-accessor
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/HelixConnectionStateListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixConnectionStateListener.java b/helix-core/src/main/java/org/apache/helix/HelixConnectionStateListener.java
index 13172d0..044a3a7 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixConnectionStateListener.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixConnectionStateListener.java
@@ -1,5 +1,24 @@
package org.apache.helix;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
public interface HelixConnectionStateListener {
/**
* called after connection is established
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/HelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixController.java b/helix-core/src/main/java/org/apache/helix/HelixController.java
index ce47e3d..9565cfb 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixController.java
@@ -1,5 +1,24 @@
package org.apache.helix;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import org.apache.helix.api.id.ControllerId;
public interface HelixController extends HelixRole, HelixService, HelixConnectionStateListener {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/HelixParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixParticipant.java b/helix-core/src/main/java/org/apache/helix/HelixParticipant.java
index 9002b15..678da4a 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixParticipant.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixParticipant.java
@@ -1,5 +1,24 @@
package org.apache.helix;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.participant.StateMachineEngine;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/HelixRole.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixRole.java b/helix-core/src/main/java/org/apache/helix/HelixRole.java
index 9e112d1..ffcb700 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixRole.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixRole.java
@@ -1,5 +1,24 @@
package org.apache.helix;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.Id;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/HelixService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixService.java b/helix-core/src/main/java/org/apache/helix/HelixService.java
index 33cc8e5..a1ce0ec 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixService.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixService.java
@@ -1,5 +1,24 @@
package org.apache.helix;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
/**
* Operational methods of a helix role
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
index 869563a..487c3a6 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
@@ -1,5 +1,24 @@
package org.apache.helix.manager.zk;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.ConfigChangeListener;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackHandler.java
new file mode 100644
index 0000000..e0a0b33
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackHandler.java
@@ -0,0 +1,460 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.apache.helix.HelixConstants.ChangeType.CONFIG;
+import static org.apache.helix.HelixConstants.ChangeType.CURRENT_STATE;
+import static org.apache.helix.HelixConstants.ChangeType.EXTERNAL_VIEW;
+import static org.apache.helix.HelixConstants.ChangeType.IDEAL_STATE;
+import static org.apache.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
+import static org.apache.helix.HelixConstants.ChangeType.MESSAGE;
+import static org.apache.helix.HelixConstants.ChangeType.MESSAGES_CONTROLLER;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HealthStateChangeListener;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.HelixRole;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.InstanceConfigChangeListener;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.NotificationContext.Type;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.ScopedConfigChangeListener;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.HealthStat;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Watcher.Event.EventType;
+
+/**
+ * This is a copy of {@link CallbackHandler} We need to synchronize on ZkHelixConnection
+ * instead ofHelixManager to avoid dead-lock.
+ * Otherwise an example deadlock scenario would be:
+ * 1) main-thread calls ZkHelixConnection#disconnect(), results in:
+ * - ZkHelixController#reset(), holding ZkHelixConnection, waiting HelixConnectionAdaptor
+ * 2) zk-event-thread calls CallbackHandler#handleChildChange(), results in:
+ * - CallbackHandler#invoke(), holding HelixConnectionAdaptor, waiting ZkHelixConnection
+ * TODO remove code duplication
+ */
+public class ZkCallbackHandler implements IZkChildListener, IZkDataListener
+
+{
+ private static Logger logger = Logger.getLogger(ZkCallbackHandler.class);
+
+ /**
+ * define the next possible notification types
+ */
+ private static Map<Type, List<Type>> nextNotificationType = new HashMap<Type, List<Type>>();
+ static {
+ nextNotificationType.put(Type.INIT, Arrays.asList(Type.CALLBACK, Type.FINALIZE));
+ nextNotificationType.put(Type.CALLBACK, Arrays.asList(Type.CALLBACK, Type.FINALIZE));
+ nextNotificationType.put(Type.FINALIZE, Arrays.asList(Type.INIT));
+ }
+
+ private final String _path;
+ private final Object _listener;
+ private final EventType[] _eventTypes;
+
+ private final ChangeType _changeType;
+ private final ZkClient _zkClient;
+ private final AtomicLong _lastNotificationTimeStamp;
+
+ private final HelixRole _role;
+ private final HelixManager _manager;
+ private final String _instanceName;
+ private final HelixConnection _connection;
+ private final HelixDataAccessor _accessor;
+
+ private final PropertyKey _propertyKey;
+
+ /**
+ * maintain the expected notification types
+ * this is fix for HELIX-195: race condition between FINALIZE callbacks and Zk callbacks
+ */
+ private List<NotificationContext.Type> _expectTypes = nextNotificationType.get(Type.FINALIZE);
+
+ public ZkCallbackHandler(HelixRole role, ZkClient client,
+ PropertyKey propertyKey,
+ Object listener, EventType[] eventTypes, ChangeType changeType) {
+ if (listener == null) {
+ throw new HelixException("listener could not be null");
+ }
+
+ _role = role;
+ _manager = new HelixConnectionAdaptor(role);
+ _instanceName = role.getId().stringify();
+ _connection = role.getConnection();
+ _accessor = _connection.createDataAccessor(role.getClusterId());
+ _zkClient = client;
+ _propertyKey = propertyKey;
+ _path = propertyKey.getPath();
+ _listener = listener;
+ _eventTypes = eventTypes;
+ _changeType = changeType;
+ _lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
+ init();
+ }
+
+ public Object getListener() {
+ return _listener;
+ }
+
+ public String getPath() {
+ return _path;
+ }
+
+ public void invoke(NotificationContext changeContext) throws Exception {
+ // This allows the listener to work with one change at a time
+ synchronized (_connection) {
+ Type type = changeContext.getType();
+ if (!_expectTypes.contains(type)) {
+ logger.warn("Skip processing callbacks for listener: " + _listener + ", path: " + _path
+ + ", expected types: " + _expectTypes + " but was " + type);
+ return;
+ }
+ _expectTypes = nextNotificationType.get(type);
+
+ // Builder keyBuilder = _accessor.keyBuilder();
+ long start = System.currentTimeMillis();
+ if (logger.isInfoEnabled()) {
+ logger.info(Thread.currentThread().getId() + " START:INVOKE " + _path + " listener:"
+ + _listener.getClass().getCanonicalName());
+ }
+
+ if (_changeType == IDEAL_STATE) {
+
+ IdealStateChangeListener idealStateChangeListener = (IdealStateChangeListener) _listener;
+ subscribeForChanges(changeContext, _path, true, true);
+ List<IdealState> idealStates = _accessor.getChildValues(_propertyKey);
+
+ idealStateChangeListener.onIdealStateChange(idealStates, changeContext);
+
+ } else if (_changeType == ChangeType.INSTANCE_CONFIG) {
+ subscribeForChanges(changeContext, _path, true, true);
+ if (_listener instanceof ConfigChangeListener) {
+ ConfigChangeListener configChangeListener = (ConfigChangeListener) _listener;
+ List<InstanceConfig> configs = _accessor.getChildValues(_propertyKey);
+ configChangeListener.onConfigChange(configs, changeContext);
+ } else if (_listener instanceof InstanceConfigChangeListener) {
+ InstanceConfigChangeListener listener = (InstanceConfigChangeListener) _listener;
+ List<InstanceConfig> configs = _accessor.getChildValues(_propertyKey);
+ listener.onInstanceConfigChange(configs, changeContext);
+ }
+ } else if (_changeType == CONFIG) {
+ subscribeForChanges(changeContext, _path, true, true);
+ ScopedConfigChangeListener listener = (ScopedConfigChangeListener) _listener;
+ List<HelixProperty> configs = _accessor.getChildValues(_propertyKey);
+ listener.onConfigChange(configs, changeContext);
+ } else if (_changeType == LIVE_INSTANCE) {
+ LiveInstanceChangeListener liveInstanceChangeListener =
+ (LiveInstanceChangeListener) _listener;
+ subscribeForChanges(changeContext, _path, true, true);
+ List<LiveInstance> liveInstances = _accessor.getChildValues(_propertyKey);
+
+ liveInstanceChangeListener.onLiveInstanceChange(liveInstances, changeContext);
+
+ } else if (_changeType == CURRENT_STATE) {
+ CurrentStateChangeListener currentStateChangeListener =
+ (CurrentStateChangeListener) _listener;
+ subscribeForChanges(changeContext, _path, true, true);
+ String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
+
+ List<CurrentState> currentStates = _accessor.getChildValues(_propertyKey);
+
+ currentStateChangeListener.onStateChange(instanceName, currentStates, changeContext);
+
+ } else if (_changeType == MESSAGE) {
+ MessageListener messageListener = (MessageListener) _listener;
+ subscribeForChanges(changeContext, _path, true, false);
+ String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
+ List<Message> messages = _accessor.getChildValues(_propertyKey);
+
+ messageListener.onMessage(instanceName, messages, changeContext);
+
+ } else if (_changeType == MESSAGES_CONTROLLER) {
+ MessageListener messageListener = (MessageListener) _listener;
+ subscribeForChanges(changeContext, _path, true, false);
+ List<Message> messages = _accessor.getChildValues(_propertyKey);
+
+ messageListener.onMessage(_instanceName, messages, changeContext);
+
+ } else if (_changeType == EXTERNAL_VIEW) {
+ ExternalViewChangeListener externalViewListener = (ExternalViewChangeListener) _listener;
+ subscribeForChanges(changeContext, _path, true, true);
+ List<ExternalView> externalViewList = _accessor.getChildValues(_propertyKey);
+
+ externalViewListener.onExternalViewChange(externalViewList, changeContext);
+ } else if (_changeType == ChangeType.CONTROLLER) {
+ ControllerChangeListener controllerChangelistener = (ControllerChangeListener) _listener;
+ subscribeForChanges(changeContext, _path, true, false);
+ controllerChangelistener.onControllerChange(changeContext);
+ } else if (_changeType == ChangeType.HEALTH) {
+ HealthStateChangeListener healthStateChangeListener = (HealthStateChangeListener) _listener;
+ subscribeForChanges(changeContext, _path, true, true); // TODO: figure out
+ // settings here
+ String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
+
+ List<HealthStat> healthReportList = _accessor.getChildValues(_propertyKey);
+
+ healthStateChangeListener.onHealthChange(instanceName, healthReportList, changeContext);
+ }
+
+ long end = System.currentTimeMillis();
+ if (logger.isInfoEnabled()) {
+ logger.info(Thread.currentThread().getId() + " END:INVOKE " + _path + " listener:"
+ + _listener.getClass().getCanonicalName() + " Took: " + (end - start) + "ms");
+ }
+ }
+ }
+
+ private void subscribeChildChange(String path, NotificationContext context) {
+ NotificationContext.Type type = context.getType();
+ if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK) {
+ logger.info(_instanceName + " subscribes child-change. path: " + path
+ + ", listener: " + _listener);
+ _zkClient.subscribeChildChanges(path, this);
+ } else if (type == NotificationContext.Type.FINALIZE) {
+ logger.info(_instanceName + " unsubscribe child-change. path: " + path
+ + ", listener: " + _listener);
+
+ _zkClient.unsubscribeChildChanges(path, this);
+ }
+ }
+
+ private void subscribeDataChange(String path, NotificationContext context) {
+ NotificationContext.Type type = context.getType();
+ if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(_instanceName + " subscribe data-change. path: " + path
+ + ", listener: " + _listener);
+ }
+ _zkClient.subscribeDataChanges(path, this);
+
+ } else if (type == NotificationContext.Type.FINALIZE) {
+ logger.info(_instanceName + " unsubscribe data-change. path: " + path
+ + ", listener: " + _listener);
+
+ _zkClient.unsubscribeDataChanges(path, this);
+ }
+ }
+
+ // TODO watchParent is always true. consider remove it
+ private void subscribeForChanges(NotificationContext context, String path, boolean watchParent,
+ boolean watchChild) {
+ if (watchParent) {
+ subscribeChildChange(path, context);
+ }
+
+ if (watchChild) {
+ try {
+ switch (_changeType) {
+ case CURRENT_STATE:
+ case IDEAL_STATE:
+ case EXTERNAL_VIEW: {
+ // check if bucketized
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ List<ZNRecord> records = baseAccessor.getChildren(path, null, 0);
+ for (ZNRecord record : records) {
+ HelixProperty property = new HelixProperty(record);
+ String childPath = path + "/" + record.getId();
+
+ int bucketSize = property.getBucketSize();
+ if (bucketSize > 0) {
+ // subscribe both data-change and child-change on bucketized parent node
+ // data-change gives a delete-callback which is used to remove watch
+ subscribeChildChange(childPath, context);
+ subscribeDataChange(childPath, context);
+
+ // subscribe data-change on bucketized child
+ List<String> bucketizedChildNames = _zkClient.getChildren(childPath);
+ if (bucketizedChildNames != null) {
+ for (String bucketizedChildName : bucketizedChildNames) {
+ String bucketizedChildPath = childPath + "/" + bucketizedChildName;
+ subscribeDataChange(bucketizedChildPath, context);
+ }
+ }
+ } else {
+ subscribeDataChange(childPath, context);
+ }
+ }
+ break;
+ }
+ default: {
+ List<String> childNames = _zkClient.getChildren(path);
+ if (childNames != null) {
+ for (String childName : childNames) {
+ String childPath = path + "/" + childName;
+ subscribeDataChange(childPath, context);
+ }
+ }
+ break;
+ }
+ }
+ } catch (ZkNoNodeException e) {
+ logger.warn("fail to subscribe child/data change. path: " + path + ", listener: "
+ + _listener, e);
+ }
+ }
+
+ }
+
+ public EventType[] getEventTypes() {
+ return _eventTypes;
+ }
+
+ /**
+ * Invoke the listener so that it sets up the initial values from the zookeeper if any
+ * exists
+ */
+ public void init() {
+ updateNotificationTime(System.nanoTime());
+ try {
+ NotificationContext changeContext = new NotificationContext(_manager);
+ changeContext.setType(NotificationContext.Type.INIT);
+ invoke(changeContext);
+ } catch (Exception e) {
+ String msg = "Exception while invoking init callback for listener:" + _listener;
+ ZKExceptionHandler.getInstance().handle(msg, e);
+ }
+ }
+
+ @Override
+ public void handleDataChange(String dataPath, Object data) {
+ try {
+ updateNotificationTime(System.nanoTime());
+ if (dataPath != null && dataPath.startsWith(_path)) {
+ NotificationContext changeContext = new NotificationContext(_manager);
+ changeContext.setType(NotificationContext.Type.CALLBACK);
+ invoke(changeContext);
+ }
+ } catch (Exception e) {
+ String msg =
+ "exception in handling data-change. path: " + dataPath + ", listener: " + _listener;
+ ZKExceptionHandler.getInstance().handle(msg, e);
+ }
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath) {
+ try {
+ updateNotificationTime(System.nanoTime());
+ if (dataPath != null && dataPath.startsWith(_path)) {
+ logger.info(_instanceName + " unsubscribe data-change. path: " + dataPath
+ + ", listener: " + _listener);
+ _zkClient.unsubscribeDataChanges(dataPath, this);
+
+ // only needed for bucketized parent, but OK if we don't have child-change
+ // watch on the bucketized parent path
+ logger.info(_instanceName + " unsubscribe child-change. path: " + dataPath
+ + ", listener: " + _listener);
+ _zkClient.unsubscribeChildChanges(dataPath, this);
+ // No need to invoke() since this event will handled by child-change on parent-node
+ // NotificationContext changeContext = new NotificationContext(_manager);
+ // changeContext.setType(NotificationContext.Type.CALLBACK);
+ // invoke(changeContext);
+ }
+ } catch (Exception e) {
+ String msg =
+ "exception in handling data-delete-change. path: " + dataPath + ", listener: "
+ + _listener;
+ ZKExceptionHandler.getInstance().handle(msg, e);
+ }
+ }
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds) {
+ try {
+ updateNotificationTime(System.nanoTime());
+ if (parentPath != null && parentPath.startsWith(_path)) {
+ NotificationContext changeContext = new NotificationContext(_manager);
+
+ if (currentChilds == null) {
+ // parentPath has been removed
+ if (parentPath.equals(_path)) {
+ // _path has been removed, remove this listener
+ _manager.removeListener(_propertyKey, _listener);
+ }
+ changeContext.setType(NotificationContext.Type.FINALIZE);
+ } else {
+ changeContext.setType(NotificationContext.Type.CALLBACK);
+ }
+ invoke(changeContext);
+ }
+ } catch (Exception e) {
+ String msg =
+ "exception in handling child-change. instance: " + _instanceName
+ + ", parentPath: " + parentPath + ", listener: " + _listener;
+ ZKExceptionHandler.getInstance().handle(msg, e);
+ }
+ }
+
+ /**
+ * Invoke the listener for the last time so that the listener could clean up resources
+ */
+ public void reset() {
+ try {
+ NotificationContext changeContext = new NotificationContext(_manager);
+ changeContext.setType(NotificationContext.Type.FINALIZE);
+ invoke(changeContext);
+ } catch (Exception e) {
+ String msg = "Exception while resetting the listener:" + _listener;
+ ZKExceptionHandler.getInstance().handle(msg, e);
+ }
+ }
+
+ private void updateNotificationTime(long nanoTime) {
+ long l = _lastNotificationTimeStamp.get();
+ while (nanoTime > l) {
+ boolean b = _lastNotificationTimeStamp.compareAndSet(l, nanoTime);
+ if (b) {
+ break;
+ } else {
+ l = _lastNotificationTimeStamp.get();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
index 4fd7a9a..e43eddf 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -345,6 +345,10 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
@Override
public boolean delete(final String path) {
+ return this.delete(path, -1);
+ }
+
+ public boolean delete(final String path, final int version) {
long startT = System.nanoTime();
try {
try {
@@ -352,7 +356,8 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
@Override
public Object call() throws Exception {
- _connection.delete(path);
+ ZkConnection connection = (ZkConnection) _connection;
+ connection.getZookeeper().delete(path, version);
return null;
}
});
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
index d9ea445..136d47e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
@@ -1,5 +1,24 @@
package org.apache.helix.manager.zk;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.HelixAutoController;
import org.apache.helix.HelixConnection;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
index 0717d77..7d6f132 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
@@ -1,5 +1,24 @@
package org.apache.helix.manager.zk;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -22,6 +41,7 @@ import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.ExternalViewChangeListener;
import org.apache.helix.HealthStateChangeListener;
import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixAutoController;
import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.HelixConnection;
import org.apache.helix.HelixConnectionStateListener;
@@ -67,7 +87,7 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
final Set<HelixConnectionStateListener> _connectionListener =
new CopyOnWriteArraySet<HelixConnectionStateListener>();
- final Map<HelixRole, List<CallbackHandler>> _handlers;
+ final Map<HelixRole, List<ZkCallbackHandler>> _handlers;
final HelixManagerProperties _properties;
/**
@@ -88,7 +108,7 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
public ZkHelixConnection(String zkAddr) {
_zkAddr = zkAddr;
- _handlers = new HashMap<HelixRole, List<CallbackHandler>>();
+ _handlers = new HashMap<HelixRole, List<ZkCallbackHandler>>();
/**
* use system property if available
@@ -201,6 +221,11 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
}
@Override
+ public HelixAutoController createAutoController(ClusterId clusterId, ControllerId controllerId) {
+ return new ZkHelixAutoController(this, clusterId, controllerId);
+ }
+
+ @Override
public ClusterAccessor createClusterAccessor(ClusterId clusterId) {
return new ClusterAccessor(clusterId, createDataAccessor(clusterId));
}
@@ -374,14 +399,14 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
public boolean removeListener(HelixRole role, Object listener, PropertyKey key) {
LOG.info("role: " + role + " removing listener: " + listener + " on path: " + key.getPath()
+ " from connection: " + this);
- List<CallbackHandler> toRemove = new ArrayList<CallbackHandler>();
- List<CallbackHandler> handlerList = _handlers.get(role);
+ List<ZkCallbackHandler> toRemove = new ArrayList<ZkCallbackHandler>();
+ List<ZkCallbackHandler> handlerList = _handlers.get(role);
if (handlerList == null) {
return true;
}
synchronized (this) {
- for (CallbackHandler handler : handlerList) {
+ for (ZkCallbackHandler handler : handlerList) {
// compare property-key path and listener reference
if (handler.getPath().equals(key.getPath()) && handler.getListener().equals(listener)) {
toRemove.add(handler);
@@ -395,7 +420,7 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
}
// handler.reset() may modify the handlers list, so do it outside the iteration
- for (CallbackHandler handler : toRemove) {
+ for (ZkCallbackHandler handler : toRemove) {
handler.reset();
}
@@ -529,16 +554,15 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
void addListener(HelixRole role, Object listener, PropertyKey propertyKey, ChangeType changeType,
EventType[] eventType) {
// checkConnected();
- HelixManager manager = new HelixConnectionAdaptor(role);
PropertyType type = propertyKey.getType();
synchronized (this) {
if (!_handlers.containsKey(role)) {
- _handlers.put(role, new CopyOnWriteArrayList<CallbackHandler>());
+ _handlers.put(role, new CopyOnWriteArrayList<ZkCallbackHandler>());
}
- List<CallbackHandler> handlerList = _handlers.get(role);
+ List<ZkCallbackHandler> handlerList = _handlers.get(role);
- for (CallbackHandler handler : handlerList) {
+ for (ZkCallbackHandler handler : handlerList) {
// compare property-key path and listener reference
if (handler.getPath().equals(propertyKey.getPath())
&& handler.getListener().equals(listener)) {
@@ -549,8 +573,8 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
}
}
- CallbackHandler newHandler =
- new CallbackHandler(manager, _zkclient, propertyKey, listener, eventType, changeType);
+ ZkCallbackHandler newHandler =
+ new ZkCallbackHandler(role, _zkclient, propertyKey, listener, eventType, changeType);
handlerList.add(newHandler);
LOG.info("role: " + role + " added listener: " + listener + " for type: " + type
@@ -560,10 +584,10 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
void initHandlers(HelixRole role) {
synchronized (this) {
- List<CallbackHandler> handlerList = _handlers.get(role);
+ List<ZkCallbackHandler> handlerList = _handlers.get(role);
if (handlerList != null) {
- for (CallbackHandler handler : handlerList) {
+ for (ZkCallbackHandler handler : handlerList) {
handler.init();
LOG.info("role: " + role + ", init handler: " + handler.getPath() + ", "
+ handler.getListener());
@@ -574,10 +598,10 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
void resetHandlers(HelixRole role) {
synchronized (this) {
- List<CallbackHandler> handlerList = _handlers.get(role);
+ List<ZkCallbackHandler> handlerList = _handlers.get(role);
if (handlerList != null) {
- for (CallbackHandler handler : handlerList) {
+ for (ZkCallbackHandler handler : handlerList) {
handler.reset();
LOG.info("role: " + role + ", reset handler: " + handler.getPath() + ", "
+ handler.getListener());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
index b0c5a8b..f116c76 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
@@ -1,5 +1,24 @@
package org.apache.helix.manager.zk;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import java.util.ArrayList;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
index 06e8cd8..77da158 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
@@ -1,5 +1,24 @@
package org.apache.helix.manager.zk;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import java.lang.management.ManagementFactory;
import org.apache.helix.ControllerChangeListener;
@@ -58,11 +77,13 @@ public class ZkHelixLeaderElection implements ControllerChangeListener {
}
try {
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
if (changeContext.getType().equals(NotificationContext.Type.INIT)
|| changeContext.getType().equals(NotificationContext.Type.CALLBACK)) {
LOG.info(_controllerId + " is trying to acquire leadership for cluster: " + _clusterId);
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
while (accessor.getProperty(keyBuilder.controllerLeader()) == null) {
boolean success = tryUpdateController(_manager);
@@ -83,7 +104,17 @@ public class ZkHelixLeaderElection implements ControllerChangeListener {
/**
* clear write-through cache
*/
- _manager.getHelixDataAccessor().getBaseDataAccessor().reset();
+ accessor.getBaseDataAccessor().reset();
+
+ /**
+ * remove leader ephemeral znode if this is the leader
+ * note that session expiry may happen during checking leader and remove leader
+ * in this race condition, we may remove a leader node created by another controller
+ * this is fine since it will just invoke another round of leader-election
+ */
+ if (_controller.isLeader()) {
+ accessor.removeProperty(keyBuilder.controllerLeader());
+ }
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
index eba96c9..56e4be8 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
@@ -1,5 +1,24 @@
package org.apache.helix.manager.zk;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.List;
@@ -415,6 +434,10 @@ public class ZkHelixParticipant implements HelixParticipant {
_messagingService.getExecutor().shutdown();
_accessor.shutdown();
+ /**
+ * remove live instance ephemeral znode
+ */
+ _accessor.removeProperty(_keyBuilder.liveInstance(_participantId.stringify()));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/monitoring/StatusDumpTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/StatusDumpTask.java b/helix-core/src/main/java/org/apache/helix/monitoring/StatusDumpTask.java
index 751aefa..421aab0 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/StatusDumpTask.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/StatusDumpTask.java
@@ -1,5 +1,24 @@
package org.apache.helix.monitoring;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
index 770bba4..45f56e5 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
@@ -1,5 +1,24 @@
package org.apache.helix.participant.statemachine;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
index a17d9f3..320275d 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
@@ -1,5 +1,24 @@
package org.apache.helix.participant.statemachine;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import org.apache.helix.api.id.PartitionId;
public class HelixStateModelFactoryAdaptor<T extends StateModel> extends StateModelFactory<T> {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
index 43e0250..29e0a44 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
@@ -1,5 +1,24 @@
package org.apache.helix.integration;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
@@ -147,6 +166,7 @@ public class TestHelixConnection extends ZkUnitTestBase {
// clean up
controller.stopAsync();
participant.stopAsync();
+ connection.disconnect();
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
new file mode 100644
index 0000000..28b1477
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
@@ -0,0 +1,120 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.HelixAutoController;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.model.LiveInstance;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestZkHelixAutoController extends ZkUnitTestBase {
+ @Test
+ public void testOnConnectedAndDisconnecting() throws Exception {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 2;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 32, // partitions per resource
+ n, // number of nodes
+ 2, // replicas
+ "MasterSlave", true); // do rebalance
+
+ // create connection
+ HelixConnection connection = new ZkHelixConnection(ZK_ADDR);
+ connection.connect();
+
+ // start auto-controller
+ ClusterId clusterId = ClusterId.from(clusterName);
+ final HelixAutoController[] controllers = new HelixAutoController[n];
+ for (int i = 0; i < n; i++) {
+ int port = 12918 + i;
+ ControllerId controllerId = ControllerId.from("localhost_" + port);
+ controllers[i] = connection.createAutoController(clusterId, controllerId);
+ controllers[i].startAsync();
+ }
+
+ // check live-instance znode for localhost_12918/12919 exists
+ final HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ for (int i = 0; i < n; i++) {
+ String instanceName = controllers[i].getControllerId().stringify();
+ Assert.assertNotNull(accessor.getProperty(keyBuilder.liveInstance(instanceName)));
+ }
+
+ // check leader znode exists
+ LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+ Assert.assertNotNull(leader);
+ Assert.assertEquals(leader.getInstanceName(), controllers[0].getControllerId().stringify());
+
+ // stop controller localhost_12918
+ controllers[0].stopAsync();
+
+ // check live-instance znode for localhost_12918 is gone
+ String instanceName = controllers[0].getControllerId().stringify();
+ Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance(instanceName)));
+
+ // check localhost_12919 becomes the new leader
+ boolean success = TestHelper.verify(new TestHelper.Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+ if (leader == null) {
+ return false;
+ }
+ return leader.getInstanceName().equals(controllers[1].getControllerId().stringify());
+
+ }
+ }, 3 * 1000);
+ Assert.assertTrue(success, "fail to re-elect new leader");
+
+ // clean up
+ connection.disconnect();
+
+ // check live-instance znode for localhost_12919 is gone
+ instanceName = controllers[1].getControllerId().stringify();
+ Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance(instanceName)));
+
+ // check leader znode is gone
+ Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java
new file mode 100644
index 0000000..0127edb
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java
@@ -0,0 +1,160 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixController;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.model.LiveInstance;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestZkHelixController extends ZkUnitTestBase {
+
+ @Test
+ public void testOnConnectedAndDisconnecting() throws Exception {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 2;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 32, // partitions per resource
+ n, // number of nodes
+ 2, // replicas
+ "MasterSlave", true); // do rebalance
+
+ // create connection
+ HelixConnection connection = new ZkHelixConnection(ZK_ADDR);
+ connection.connect();
+
+ // start controller
+ ClusterId clusterId = ClusterId.from(clusterName);
+ ControllerId controllerId = ControllerId.from("controller");
+ HelixController controller = connection.createController(clusterId, controllerId);
+ controller.startAsync();
+
+ // check leader znode exists
+ HelixDataAccessor accessor = connection.createDataAccessor(clusterId);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+ Assert.assertNotNull(leader);
+ Assert.assertEquals(leader.getInstanceName(), controllerId.stringify());
+
+ // stop participant
+ controller.stopAsync();
+
+ // check leader znode is gone
+ Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
+
+ // clean up
+ connection.disconnect();
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ /**
+ * Remove leader znode externally should invoke another round of leader-election
+ * this simulates the race condition in
+ * {@link ZkHelixLeaderElection#onControllerChange(org.apache.helix.NotificationContext)}
+ * @throws Exception
+ */
+ @Test
+ public void testRemoveLeaderZnode() throws Exception {
+
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 2;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 32, // partitions per resource
+ n, // number of nodes
+ 2, // replicas
+ "MasterSlave", true); // do rebalance
+
+ // create connection
+ HelixConnection connection = new ZkHelixConnection(ZK_ADDR);
+ connection.connect();
+
+ // start controller
+ ClusterId clusterId = ClusterId.from(clusterName);
+ final ControllerId controllerId = ControllerId.from("controller");
+
+ // start controller
+ HelixController controller = connection.createController(clusterId, controllerId);
+ controller.startAsync();
+
+ // check live-instance znode for localhost_12918 exists
+ final HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+ Assert.assertNotNull(leader);
+ Assert.assertEquals(leader.getInstanceName(), controllerId.stringify());
+
+ // remove leader znode externally
+ accessor.removeProperty(keyBuilder.controllerLeader());
+
+ // verify leader is re-elected
+ boolean result = TestHelper.verify(new TestHelper.Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+ if (leader == null) {
+ return false;
+ }
+
+ return leader.getInstanceName().equals(controllerId.stringify());
+ }
+ }, 3 * 1000);
+
+ Assert.assertTrue(result, "Fail to re-elect a new leader");
+
+ // clean up
+ connection.disconnect();
+
+ // check leader znode is gone
+ Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java
new file mode 100644
index 0000000..8658736
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java
@@ -0,0 +1,103 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixParticipant;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.integration.TestHelixConnection;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestZkHelixParticipant extends ZkUnitTestBase {
+
+ @Test
+ public void testOnConnectedAndDisconnecting() throws Exception {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 2;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 32, // partitions per resource
+ n, // number of nodes
+ 2, // replicas
+ "MasterSlave", true); // do rebalance
+
+ // create connection
+ HelixConnection connection = new ZkHelixConnection(ZK_ADDR);
+ connection.connect();
+
+ // start participant
+ ClusterId clusterId = ClusterId.from(clusterName);
+ HelixParticipant[] participants = new HelixParticipant[n];
+ for (int i = 0; i < n; i++) {
+ int port = 12918 + i;
+ ParticipantId participantId = ParticipantId.from("localhost_" + port);
+
+ participants[i] = connection.createParticipant(clusterId, participantId);
+ participants[i].getStateMachineEngine().registerStateModelFactory(
+ StateModelDefId.from("MasterSlave"), new TestHelixConnection.MockStateModelFactory());
+
+ participants[i].startAsync();
+ }
+
+ // check live-instance znode for localhost_12918/12919 exist
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ for (int i = 0; i < n; i++) {
+ Assert.assertNotNull(accessor.getProperty(keyBuilder.liveInstance(participants[i].getParticipantId().stringify())));
+ }
+
+ // stop participant localhost_12918
+ participants[0].stopAsync();
+
+ // check live-instance znode for localhost_12918 is gone
+ Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance(participants[0]
+ .getParticipantId().stringify())));
+
+ // clean up
+ connection.disconnect();
+
+ // check live-instance znode for localhost_12919 is gone
+ Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance(participants[1]
+ .getParticipantId().stringify())));
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java b/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
index 2f5a28b..c233417 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
@@ -142,10 +142,9 @@ public class LogicalModelExample {
autoJoinParticipant.stopAsync();
// stop the controller
- helixParticipant.stopAsync();
+ helixController.stopAsync();
// drop the cluster
- Thread.sleep(5000);
dropCluster(clusterId, connection);
connection.disconnect();
}
[2/2] git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/incubator-helix
Posted by zz...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-helix
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/7c444e50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/7c444e50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/7c444e50
Branch: refs/heads/master
Commit: 7c444e50603a79804834b4da8e159222eea3a1f2
Parents: 72d879e 55c9351
Author: zzhang <zz...@apache.org>
Authored: Mon Nov 11 17:18:40 2013 -0800
Committer: zzhang <zz...@apache.org>
Committed: Mon Nov 11 17:18:40 2013 -0800
----------------------------------------------------------------------
.../controller/rebalancer/CustomRebalancer.java | 62 +------
.../rebalancer/FallbackRebalancer.java | 185 +++++++++++++++++++
.../util/ConstraintBasedAssignment.java | 50 +++++
.../stages/BestPossibleStateCalcStage.java | 16 +-
.../controller/stages/ResourceCurrentState.java | 11 ++
.../java/org/apache/helix/model/Partition.java | 2 +
.../java/org/apache/helix/model/Resource.java | 4 +-
.../TestUserDefRebalancerCompatibility.java | 104 +++++++++++
8 files changed, 366 insertions(+), 68 deletions(-)
----------------------------------------------------------------------