You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/08/07 22:46:41 UTC
[4/5] git commit: [HELIX-484] Remove
CallbackHandler/ZkCallbackHandler code duplication,
[HELIX-486] Remove StateModelFactory/HelixStateModelFactory code duplication,
rb=24332
[HELIX-484] Remove CallbackHandler/ZkCallbackHandler code duplication, [HELIX-486] Remove StateModelFactory/HelixStateModelFactory code duplication, rb=24332
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4ae1ff79
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4ae1ff79
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4ae1ff79
Branch: refs/heads/master
Commit: 4ae1ff796a524766b22f5fc184952187dc9cc24b
Parents: dd8226b
Author: zzhang <zz...@apache.org>
Authored: Thu Aug 7 13:45:50 2014 -0700
Committer: zzhang <zz...@apache.org>
Committed: Thu Aug 7 13:45:50 2014 -0700
----------------------------------------------------------------------
.../apache/helix/webapp/TestResetInstance.java | 9 +-
.../apache/helix/webapp/TestResetResource.java | 9 +-
.../org/apache/helix/agent/AgentStateModel.java | 4 +-
.../helix/agent/AgentStateModelFactory.java | 6 +-
.../api/StateTransitionHandlerFactory.java | 114 +++++
.../org/apache/helix/api/TransitionHandler.java | 106 +++++
.../apache/helix/api/id/StateModelDefId.java | 4 +
.../helix/controller/HelixControllerMain.java | 10 +-
.../helix/manager/zk/CallbackHandler.java | 420 -------------------
.../manager/zk/ParticipantManagerHelper.java | 281 -------------
.../helix/manager/zk/ZkHelixParticipant.java | 5 +-
.../handling/HelixStateTransitionHandler.java | 14 +-
.../helix/participant/CustomCodeInvoker.java | 9 +-
.../DistClusterControllerStateModel.java | 4 +-
.../DistClusterControllerStateModelFactory.java | 8 +-
.../participant/GenericLeaderStandbyModel.java | 7 +-
.../GenericLeaderStandbyStateModelFactory.java | 9 +-
.../participant/HelixCustomCodeRunner.java | 8 +-
.../participant/HelixStateMachineEngine.java | 121 ++----
.../helix/participant/StateMachineEngine.java | 38 +-
.../statemachine/HelixStateModelFactory.java | 118 ------
.../HelixStateModelFactoryAdaptor.java | 36 --
.../statemachine/ScheduledTaskStateModel.java | 34 +-
.../ScheduledTaskStateModelFactory.java | 10 +-
.../participant/statemachine/StateModel.java | 104 -----
.../statemachine/StateModelFactory.java | 116 -----
.../statemachine/StateModelParser.java | 11 +-
.../java/org/apache/helix/task/TaskRunner.java | 6 +-
.../org/apache/helix/task/TaskStateModel.java | 4 +-
.../helix/task/TaskStateModelFactory.java | 6 +-
.../org/apache/helix/tools/ClusterSetup.java | 2 +-
.../org/apache/helix/DummyProcessThread.java | 7 +-
.../src/test/java/org/apache/helix/Mocks.java | 6 +-
.../org/apache/helix/TestHelixTaskExecutor.java | 6 +-
.../org/apache/helix/TestHelixTaskHandler.java | 8 +-
.../test/java/org/apache/helix/TestHelper.java | 29 --
.../TestAddStateModelFactoryAfterConnect.java | 5 +-
.../integration/TestBatchMessageWrapper.java | 10 +-
.../TestCorrectnessOnConnectivityLoss.java | 10 +-
.../TestDistributedClusterController.java | 144 -------
.../helix/integration/TestErrorPartition.java | 14 +-
.../helix/integration/TestHelixConnection.java | 10 +-
.../helix/integration/TestMessageThrottle2.java | 17 +-
.../helix/integration/TestMessagingService.java | 35 +-
.../integration/TestMultiClusterController.java | 144 +++++++
.../integration/TestNonOfflineInitState.java | 3 +-
.../TestPartitionLevelTransitionConstraint.java | 16 +-
.../integration/TestPreferenceListAsQueue.java | 14 +-
.../helix/integration/TestResetInstance.java | 9 +-
.../integration/TestResetPartitionState.java | 9 +-
.../helix/integration/TestResetResource.java | 9 +-
.../helix/integration/TestSchedulerMessage.java | 1 -
.../integration/TestStateTransitionTimeout.java | 21 +-
.../helix/integration/TestZkReconnect.java | 15 +-
.../manager/TestConsecutiveZkSessionExpiry.java | 3 +-
.../TestDistributedControllerManager.java | 203 ---------
.../TestHelixMultiClusterController.java | 204 +++++++++
.../manager/TestParticipantManager.java | 3 +-
.../integration/manager/TestStateModelLeak.java | 48 ++-
.../integration/manager/ZkTestManager.java | 35 --
.../manager/zk/MockMultiClusterController.java | 3 +-
.../helix/manager/zk/MockParticipant.java | 9 +-
.../TestDefaultControllerMsgHandlerFactory.java | 8 +-
.../helix/messaging/TestAsyncCallbackSvc.java | 13 +-
.../helix/mock/participant/DummyProcess.java | 37 +-
.../participant/MockBootstrapModelFactory.java | 8 +-
.../participant/MockBootstrapStateModel.java | 4 +-
.../mock/participant/MockMSModelFactory.java | 12 +-
.../mock/participant/MockMSStateModel.java | 4 +-
.../participant/MockSchemataModelFactory.java | 11 +-
.../participant/MockSchemataStateModel.java | 4 +-
.../org/apache/helix/model/TestConstraint.java | 2 +-
.../helix/participant/MockZKHelixManager.java | 2 +-
.../TestDistControllerStateModelFactory.java | 3 +-
.../statemachine/TestStateModelParser.java | 5 +-
.../apache/helix/examples/BootstrapHandler.java | 17 +-
.../apache/helix/examples/BootstrapProcess.java | 17 +-
.../apache/helix/examples/DummyParticipant.java | 13 +-
.../apache/helix/examples/ExampleProcess.java | 12 +-
.../LeaderStandbyStateModelFactory.java | 11 +-
.../helix/examples/LogicalModelExample.java | 10 +-
.../examples/MasterSlaveStateModelFactory.java | 13 +-
.../OnlineOfflineStateModelFactory.java | 11 +-
.../org/apache/helix/examples/Quickstart.java | 2 +-
.../participant/StatelessServiceStateModel.java | 4 +-
.../StatelessServiceStateModelFactory.java | 6 +-
.../java/org/apache/helix/lockmanager/Lock.java | 4 +-
.../apache/helix/lockmanager/LockFactory.java | 6 +-
.../apache/helix/recipes/rabbitmq/Consumer.java | 6 +-
.../recipes/rabbitmq/ConsumerStateModel.java | 5 +-
.../rabbitmq/ConsumerStateModelFactory.java | 40 --
.../ConsumerStateTransitionHandlerFactory.java | 40 ++
.../helix/filestore/FileStoreStateModel.java | 4 +-
.../filestore/FileStoreStateModelFactory.java | 6 +-
.../helix/taskexecution/TaskStateModel.java | 4 +-
.../taskexecution/TaskStateModelFactory.java | 6 +-
.../helix/userdefinedrebalancer/Lock.java | 4 +-
.../userdefinedrebalancer/LockFactory.java | 6 +-
98 files changed, 1008 insertions(+), 2035 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java
index a9ecaa0..6db5107 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java
@@ -57,12 +57,9 @@ public class TestResetInstance extends AdminTestBase {
MockController controller = new MockController(_zkaddr, clusterName, "controller_0");
controller.syncStart();
- Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
- {
- put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
- put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
- }
- };
+ Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+ errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+ errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
// start mock participants
MockParticipant[] participants = new MockParticipant[n];
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java
index a54b0a3..d066f18 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java
@@ -57,12 +57,9 @@ public class TestResetResource extends AdminTestBase {
MockController controller = new MockController(_zkaddr, clusterName, "controller_0");
controller.syncStart();
- Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
- {
- put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
- put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
- }
- };
+ Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+ errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+ errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
// start mock participants
MockParticipant[] participants = new MockParticipant[n];
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java
----------------------------------------------------------------------
diff --git a/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java b/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java
index d227ac3..509c0b7 100644
--- a/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java
+++ b/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java
@@ -30,17 +30,17 @@ import org.apache.helix.ExternalCommand;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.api.State;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.Message;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.log4j.Logger;
@StateModelInfo(initialState = "OFFLINE", states = {})
-public class AgentStateModel extends StateModel {
+public class AgentStateModel extends TransitionHandler {
private static final Logger _logger = Logger.getLogger(AgentStateModel.class);
private static Pattern pattern = Pattern.compile("(\\{.+?\\})");
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java b/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
index 69d45ae..4e74405 100644
--- a/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
+++ b/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
@@ -19,13 +19,13 @@ package org.apache.helix.agent;
* under the License.
*/
+import org.apache.helix.api.StateTransitionHandlerFactory;
import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
-public class AgentStateModelFactory extends HelixStateModelFactory<AgentStateModel> {
+public class AgentStateModelFactory extends StateTransitionHandlerFactory<AgentStateModel> {
@Override
- public AgentStateModel createNewStateModel(PartitionId partitionKey) {
+ public AgentStateModel createStateTransitionHandler(PartitionId partitionKey) {
AgentStateModel model = new AgentStateModel();
return model;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/api/StateTransitionHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/StateTransitionHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/api/StateTransitionHandlerFactory.java
new file mode 100644
index 0000000..1e2326d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/StateTransitionHandlerFactory.java
@@ -0,0 +1,114 @@
+package org.apache.helix.api;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.messaging.handling.BatchMessageWrapper;
+
+public abstract class StateTransitionHandlerFactory<T extends TransitionHandler> {
+ /**
+ * map from partitionId to transition-handler
+ */
+ private final ConcurrentMap<PartitionId, T> _transitionHandlerMap =
+ new ConcurrentHashMap<PartitionId, T>();
+
+ /**
+ * map from resourceName to BatchMessageWrapper
+ */
+ private final ConcurrentMap<ResourceId, BatchMessageWrapper> _batchMsgWrapperMap =
+ new ConcurrentHashMap<ResourceId, BatchMessageWrapper>();
+
+ /**
+ * This method will be invoked only once per partition per session
+ * @param partitionId
+ * @return
+ */
+ public abstract T createStateTransitionHandler(PartitionId partitionId);
+
+ /**
+ * Create a state model for a partition
+ * @param partitionId
+ */
+ public T createAndAddSTransitionHandler(PartitionId partitionId) {
+ T stateModel = createStateTransitionHandler(partitionId);
+ _transitionHandlerMap.put(partitionId, stateModel);
+ return stateModel;
+ }
+
+ /**
+ * Get the state model for a partition
+ * @param partitionId
+ * @return state model if exists, null otherwise
+ */
+ public T getTransitionHandler(PartitionId partitionId) {
+ return _transitionHandlerMap.get(partitionId);
+ }
+
+ /**
+ * remove state model for a partition
+ * @param partitionId
+ * @return state model removed or null if not exist
+ */
+ public T removeTransitionHandler(PartitionId partitionId) {
+ return _transitionHandlerMap.remove(partitionId);
+ }
+
+ /**
+ * get partition set
+ * @return partitionId set
+ */
+ public Set<PartitionId> getPartitionSet() {
+ return _transitionHandlerMap.keySet();
+ }
+
+ /**
+ * create a default batch-message-wrapper for a resource
+ * @param resourceId
+ * @return
+ */
+ public BatchMessageWrapper createBatchMessageWrapper(ResourceId resourceId) {
+ return new BatchMessageWrapper();
+ }
+
+ /**
+ * create a batch-message-wrapper for a resource and put it into map
+ * @param resourceId
+ * @return
+ */
+ public BatchMessageWrapper createAndAddBatchMessageWrapper(ResourceId resourceId) {
+ BatchMessageWrapper wrapper = createBatchMessageWrapper(resourceId);
+ _batchMsgWrapperMap.put(resourceId, wrapper);
+ return wrapper;
+ }
+
+ /**
+ * get batch-message-wrapper for a resource
+ * @param resourceId
+ * @return
+ */
+ public BatchMessageWrapper getBatchMessageWrapper(ResourceId resourceId) {
+ return _batchMsgWrapperMap.get(resourceId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/api/TransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/TransitionHandler.java b/helix-core/src/main/java/org/apache/helix/api/TransitionHandler.java
new file mode 100644
index 0000000..ff82a2d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/TransitionHandler.java
@@ -0,0 +1,106 @@
+package org.apache.helix.api;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+public abstract class TransitionHandler {
+ public static final String DEFAULT_INITIAL_STATE = "OFFLINE";
+ Logger logger = Logger.getLogger(TransitionHandler.class);
+
+ // TODO Get default state from implementation or from state model annotation
+ // StateModel with initial state other than OFFLINE should override this field
+ protected String _currentState = DEFAULT_INITIAL_STATE;
+
+ /**
+ * requested-state is used (e.g. by task-framework) to request next state
+ */
+ protected String _requestedState = null;
+
+ public String getCurrentState() {
+ return _currentState;
+ }
+
+ // @transition(from='from', to='to')
+ public void defaultTransitionHandler() {
+ logger
+ .error("Default default handler. The idea is to invoke this if no transition method is found. Yet to be implemented");
+ }
+
+ public boolean updateState(String newState) {
+ _currentState = newState;
+ return true;
+ }
+
+ /**
+ * Get requested-state
+ * @return requested-state
+ */
+ public String getRequestedState() {
+ return _requestedState;
+ }
+
+ /**
+ * Set requested-state
+ * @param requestedState
+ */
+ public void setRequestedState(String requestedState) {
+ _requestedState = requestedState;
+ }
+
+ /**
+ * Called when error occurs in state transition
+ * TODO:enforce subclass to write this
+ * @param message
+ * @param context
+ * @param error
+ */
+ public void rollbackOnError(Message message, NotificationContext context,
+ StateTransitionError error) {
+
+ logger.error("Default rollback method invoked on error. Error Code: " + error.getCode());
+
+ }
+
+ /**
+ * Called when the state model is reset
+ */
+ public void reset() {
+ logger
+ .warn("Default reset method invoked. Either because the process longer own this resource or session timedout");
+ }
+
+ /**
+ * default transition for drop partition in error state
+ * @param message
+ * @param context
+ * @throws InterruptedException
+ */
+ @Transition(to = "DROPPED", from = "ERROR")
+ public void onBecomeDroppedFromError(Message message, NotificationContext context)
+ throws Exception {
+ logger.info("Default ERROR->DROPPED transition invoked.");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/api/id/StateModelDefId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/id/StateModelDefId.java b/helix-core/src/main/java/org/apache/helix/api/id/StateModelDefId.java
index 7c84f0f..b98cbcd 100644
--- a/helix-core/src/main/java/org/apache/helix/api/id/StateModelDefId.java
+++ b/helix-core/src/main/java/org/apache/helix/api/id/StateModelDefId.java
@@ -26,6 +26,10 @@ import org.codehaus.jackson.annotate.JsonProperty;
public class StateModelDefId extends Id {
public static final StateModelDefId SchedulerTaskQueue = StateModelDefId
.from(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE);
+ public static final StateModelDefId MasterSlave = StateModelDefId.from("MasterSlave");
+ public static final StateModelDefId LeaderStandby = StateModelDefId.from("LeaderStandby");
+ public static final StateModelDefId OnlineOffline = StateModelDefId.from("OnlineOffline");
+
@JsonProperty("id")
private final String _id;
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
index 6aa3ab9..ca540c5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
@@ -47,6 +47,7 @@ import org.apache.commons.cli.ParseException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.manager.zk.HelixManagerShutdownHook;
import org.apache.helix.participant.DistClusterControllerStateModelFactory;
import org.apache.helix.participant.StateMachineEngine;
@@ -164,18 +165,11 @@ public class HelixControllerMain {
DistClusterControllerStateModelFactory stateModelFactory =
new DistClusterControllerStateModelFactory(zkConnectString);
- // StateMachineEngine genericStateMachineHandler = new
- // StateMachineEngine();
StateMachineEngine stateMach = manager.getStateMachineEngine();
- stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory);
- // manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
- // genericStateMachineHandler);
+ stateMach.registerStateModelFactory(StateModelDefId.LeaderStandby, stateModelFactory);
manager.connect();
} else {
logger.error("cluster controller mode:" + controllerMode + " NOT supported");
- // throw new
- // IllegalArgumentException("Unsupported cluster controller mode:" +
- // controllerMode);
}
} catch (Exception e) {
logger.error("Exception while starting controller", e);
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
deleted file mode 100644
index cdb2845..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ /dev/null
@@ -1,420 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import static org.apache.helix.HelixConstants.ChangeType.CONFIG;
-import static org.apache.helix.HelixConstants.ChangeType.CURRENT_STATE;
-import static org.apache.helix.HelixConstants.ChangeType.EXTERNAL_VIEW;
-import static org.apache.helix.HelixConstants.ChangeType.IDEAL_STATE;
-import static org.apache.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
-import static org.apache.helix.HelixConstants.ChangeType.MESSAGE;
-import static org.apache.helix.HelixConstants.ChangeType.MESSAGES_CONTROLLER;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.ControllerChangeListener;
-import org.apache.helix.CurrentStateChangeListener;
-import org.apache.helix.ExternalViewChangeListener;
-import org.apache.helix.HelixConstants.ChangeType;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.IdealStateChangeListener;
-import org.apache.helix.InstanceConfigChangeListener;
-import org.apache.helix.LiveInstanceChangeListener;
-import org.apache.helix.MessageListener;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.NotificationContext.Type;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.ScopedConfigChangeListener;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-
-public class CallbackHandler implements IZkChildListener, IZkDataListener
-
-{
- private static Logger logger = Logger.getLogger(CallbackHandler.class);
-
- /**
- * define the next possible notification types
- */
- private static Map<Type, List<Type>> nextNotificationType = new HashMap<Type, List<Type>>();
- static {
- nextNotificationType.put(Type.INIT, Arrays.asList(Type.CALLBACK, Type.FINALIZE));
- nextNotificationType.put(Type.CALLBACK, Arrays.asList(Type.CALLBACK, Type.FINALIZE));
- nextNotificationType.put(Type.FINALIZE, Arrays.asList(Type.INIT));
- }
-
- private final String _path;
- private final Object _listener;
- private final EventType[] _eventTypes;
- private final HelixDataAccessor _accessor;
- private final ChangeType _changeType;
- private final ZkClient _zkClient;
- private final AtomicLong _lastNotificationTimeStamp;
- private final HelixManager _manager;
- private final PropertyKey _propertyKey;
-
- /**
- * maintain the expected notification types
- * this is fix for HELIX-195: race condition between FINALIZE callbacks and Zk callbacks
- */
- private List<NotificationContext.Type> _expectTypes = nextNotificationType.get(Type.FINALIZE);
-
- public CallbackHandler(HelixManager manager, ZkClient client, PropertyKey propertyKey,
- Object listener, EventType[] eventTypes, ChangeType changeType) {
- if (listener == null) {
- throw new HelixException("listener could not be null");
- }
-
- this._manager = manager;
- this._accessor = manager.getHelixDataAccessor();
- this._zkClient = client;
- this._propertyKey = propertyKey;
- this._path = propertyKey.getPath();
- this._listener = listener;
- this._eventTypes = eventTypes;
- this._changeType = changeType;
- this._lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
- init();
- }
-
- public Object getListener() {
- return _listener;
- }
-
- public String getPath() {
- return _path;
- }
-
- public void invoke(NotificationContext changeContext) throws Exception {
- // This allows the listener to work with one change at a time
- synchronized (_manager) {
- Type type = changeContext.getType();
- if (!_expectTypes.contains(type)) {
- logger.warn("Skip processing callbacks for listener: " + _listener + ", path: " + _path
- + ", expected types: " + _expectTypes + " but was " + type);
- return;
- }
- _expectTypes = nextNotificationType.get(type);
-
- // Builder keyBuilder = _accessor.keyBuilder();
- long start = System.currentTimeMillis();
- if (logger.isInfoEnabled()) {
- logger.info(Thread.currentThread().getId() + " START:INVOKE " + _path + " listener:"
- + _listener.getClass().getCanonicalName());
- }
-
- if (_changeType == IDEAL_STATE) {
-
- IdealStateChangeListener idealStateChangeListener = (IdealStateChangeListener) _listener;
- subscribeForChanges(changeContext, _path, true, true);
- List<IdealState> idealStates = _accessor.getChildValues(_propertyKey);
-
- idealStateChangeListener.onIdealStateChange(idealStates, changeContext);
-
- } else if (_changeType == ChangeType.INSTANCE_CONFIG) {
- subscribeForChanges(changeContext, _path, true, true);
- InstanceConfigChangeListener listener = (InstanceConfigChangeListener) _listener;
- List<InstanceConfig> configs = _accessor.getChildValues(_propertyKey);
- listener.onInstanceConfigChange(configs, changeContext);
- } else if (_changeType == CONFIG) {
- subscribeForChanges(changeContext, _path, true, true);
- ScopedConfigChangeListener listener = (ScopedConfigChangeListener) _listener;
- List<HelixProperty> configs = _accessor.getChildValues(_propertyKey);
- listener.onConfigChange(configs, changeContext);
- } else if (_changeType == LIVE_INSTANCE) {
- LiveInstanceChangeListener liveInstanceChangeListener =
- (LiveInstanceChangeListener) _listener;
- subscribeForChanges(changeContext, _path, true, true);
- List<LiveInstance> liveInstances = _accessor.getChildValues(_propertyKey);
-
- liveInstanceChangeListener.onLiveInstanceChange(liveInstances, changeContext);
-
- } else if (_changeType == CURRENT_STATE) {
- CurrentStateChangeListener currentStateChangeListener =
- (CurrentStateChangeListener) _listener;
- subscribeForChanges(changeContext, _path, true, true);
- String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
-
- List<CurrentState> currentStates = _accessor.getChildValues(_propertyKey);
-
- currentStateChangeListener.onStateChange(instanceName, currentStates, changeContext);
-
- } else if (_changeType == MESSAGE) {
- MessageListener messageListener = (MessageListener) _listener;
- subscribeForChanges(changeContext, _path, true, false);
- String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
- List<Message> messages = _accessor.getChildValues(_propertyKey);
-
- messageListener.onMessage(instanceName, messages, changeContext);
-
- } else if (_changeType == MESSAGES_CONTROLLER) {
- MessageListener messageListener = (MessageListener) _listener;
- subscribeForChanges(changeContext, _path, true, false);
- List<Message> messages = _accessor.getChildValues(_propertyKey);
-
- messageListener.onMessage(_manager.getInstanceName(), messages, changeContext);
-
- } else if (_changeType == EXTERNAL_VIEW) {
- ExternalViewChangeListener externalViewListener = (ExternalViewChangeListener) _listener;
- subscribeForChanges(changeContext, _path, true, true);
- List<ExternalView> externalViewList = _accessor.getChildValues(_propertyKey);
-
- externalViewListener.onExternalViewChange(externalViewList, changeContext);
- } else if (_changeType == ChangeType.CONTROLLER) {
- ControllerChangeListener controllerChangelistener = (ControllerChangeListener) _listener;
- subscribeForChanges(changeContext, _path, true, false);
- controllerChangelistener.onControllerChange(changeContext);
- }
-
- long end = System.currentTimeMillis();
- if (logger.isInfoEnabled()) {
- logger.info(Thread.currentThread().getId() + " END:INVOKE " + _path + " listener:"
- + _listener.getClass().getCanonicalName() + " Took: " + (end - start) + "ms");
- }
- }
- }
-
- private void subscribeChildChange(String path, NotificationContext context) {
- NotificationContext.Type type = context.getType();
- if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK) {
- logger.info(_manager.getInstanceName() + " subscribes child-change. path: " + path
- + ", listener: " + _listener);
- _zkClient.subscribeChildChanges(path, this);
- } else if (type == NotificationContext.Type.FINALIZE) {
- logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + path
- + ", listener: " + _listener);
-
- _zkClient.unsubscribeChildChanges(path, this);
- }
- }
-
- private void subscribeDataChange(String path, NotificationContext context) {
- NotificationContext.Type type = context.getType();
- if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK) {
- if (logger.isDebugEnabled()) {
- logger.debug(_manager.getInstanceName() + " subscribe data-change. path: " + path
- + ", listener: " + _listener);
- }
- _zkClient.subscribeDataChanges(path, this);
-
- } else if (type == NotificationContext.Type.FINALIZE) {
- logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + path
- + ", listener: " + _listener);
-
- _zkClient.unsubscribeDataChanges(path, this);
- }
- }
-
- // TODO watchParent is always true. consider remove it
- private void subscribeForChanges(NotificationContext context, String path, boolean watchParent,
- boolean watchChild) {
- if (watchParent) {
- subscribeChildChange(path, context);
- }
-
- if (watchChild) {
- try {
- switch (_changeType) {
- case CURRENT_STATE:
- case IDEAL_STATE:
- case EXTERNAL_VIEW: {
- // check if bucketized
- BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
- List<ZNRecord> records = baseAccessor.getChildren(path, null, 0);
- for (ZNRecord record : records) {
- HelixProperty property = new HelixProperty(record);
- String childPath = path + "/" + record.getId();
-
- int bucketSize = property.getBucketSize();
- if (bucketSize > 0) {
- // subscribe both data-change and child-change on bucketized parent node
- // data-change gives a delete-callback which is used to remove watch
- subscribeChildChange(childPath, context);
- subscribeDataChange(childPath, context);
-
- // subscribe data-change on bucketized child
- List<String> bucketizedChildNames = _zkClient.getChildren(childPath);
- if (bucketizedChildNames != null) {
- for (String bucketizedChildName : bucketizedChildNames) {
- String bucketizedChildPath = childPath + "/" + bucketizedChildName;
- subscribeDataChange(bucketizedChildPath, context);
- }
- }
- } else {
- subscribeDataChange(childPath, context);
- }
- }
- break;
- }
- default: {
- List<String> childNames = _zkClient.getChildren(path);
- if (childNames != null) {
- for (String childName : childNames) {
- String childPath = path + "/" + childName;
- subscribeDataChange(childPath, context);
- }
- }
- break;
- }
- }
- } catch (ZkNoNodeException e) {
- logger.warn("fail to subscribe child/data change. path: " + path + ", listener: "
- + _listener, e);
- }
- }
-
- }
-
- public EventType[] getEventTypes() {
- return _eventTypes;
- }
-
- /**
- * Invoke the listener so that it sets up the initial values from the zookeeper if any
- * exists
- */
- public void init() {
- updateNotificationTime(System.nanoTime());
- try {
- NotificationContext changeContext = new NotificationContext(_manager);
- changeContext.setType(NotificationContext.Type.INIT);
- invoke(changeContext);
- } catch (Exception e) {
- String msg = "Exception while invoking init callback for listener:" + _listener;
- ZKExceptionHandler.getInstance().handle(msg, e);
- }
- }
-
- @Override
- public void handleDataChange(String dataPath, Object data) {
- try {
- updateNotificationTime(System.nanoTime());
- if (dataPath != null && dataPath.startsWith(_path)) {
- NotificationContext changeContext = new NotificationContext(_manager);
- changeContext.setType(NotificationContext.Type.CALLBACK);
- invoke(changeContext);
- }
- } catch (Exception e) {
- String msg =
- "exception in handling data-change. path: " + dataPath + ", listener: " + _listener;
- ZKExceptionHandler.getInstance().handle(msg, e);
- }
- }
-
- @Override
- public void handleDataDeleted(String dataPath) {
- try {
- updateNotificationTime(System.nanoTime());
- if (dataPath != null && dataPath.startsWith(_path)) {
- logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + dataPath
- + ", listener: " + _listener);
- _zkClient.unsubscribeDataChanges(dataPath, this);
-
- // only needed for bucketized parent, but OK if we don't have child-change
- // watch on the bucketized parent path
- logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + dataPath
- + ", listener: " + _listener);
- _zkClient.unsubscribeChildChanges(dataPath, this);
- // No need to invoke() since this event will handled by child-change on parent-node
- // NotificationContext changeContext = new NotificationContext(_manager);
- // changeContext.setType(NotificationContext.Type.CALLBACK);
- // invoke(changeContext);
- }
- } catch (Exception e) {
- String msg =
- "exception in handling data-delete-change. path: " + dataPath + ", listener: "
- + _listener;
- ZKExceptionHandler.getInstance().handle(msg, e);
- }
- }
-
- @Override
- public void handleChildChange(String parentPath, List<String> currentChilds) {
- try {
- updateNotificationTime(System.nanoTime());
- if (parentPath != null && parentPath.startsWith(_path)) {
- NotificationContext changeContext = new NotificationContext(_manager);
-
- if (currentChilds == null) {
- // parentPath has been removed
- if (parentPath.equals(_path)) {
- // _path has been removed, remove this listener
- _manager.removeListener(_propertyKey, _listener);
- }
- changeContext.setType(NotificationContext.Type.FINALIZE);
- } else {
- changeContext.setType(NotificationContext.Type.CALLBACK);
- }
- invoke(changeContext);
- }
- } catch (Exception e) {
- String msg =
- "exception in handling child-change. instance: " + _manager.getInstanceName()
- + ", parentPath: " + parentPath + ", listener: " + _listener;
- ZKExceptionHandler.getInstance().handle(msg, e);
- }
- }
-
- /**
- * Invoke the listener for the last time so that the listener could clean up resources
- */
- public void reset() {
- try {
- NotificationContext changeContext = new NotificationContext(_manager);
- changeContext.setType(NotificationContext.Type.FINALIZE);
- invoke(changeContext);
- } catch (Exception e) {
- String msg = "Exception while resetting the listener:" + _listener;
- ZKExceptionHandler.getInstance().handle(msg, e);
- }
- }
-
- private void updateNotificationTime(long nanoTime) {
- long l = _lastNotificationTimeStamp.get();
- while (nanoTime > l) {
- boolean b = _lastNotificationTimeStamp.compareAndSet(l, nanoTime);
- if (b) {
- break;
- } else {
- l = _lastNotificationTimeStamp.get();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
deleted file mode 100644
index c1d856d..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
+++ /dev/null
@@ -1,281 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.lang.management.ManagementFactory;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.apache.helix.AccessOption;
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
-import org.apache.helix.LiveInstanceInfoProvider;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.messaging.DefaultMessagingService;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
-
-/**
- * helper class for participant-manager
- */
-public class ParticipantManagerHelper {
- private static Logger LOG = Logger.getLogger(ParticipantManagerHelper.class);
-
- final ZkClient _zkclient;
- final HelixManager _manager;
- final PropertyKey.Builder _keyBuilder;
- final String _clusterName;
- final String _instanceName;
- final String _sessionId;
- final int _sessionTimeout;
- final ConfigAccessor _configAccessor;
- final InstanceType _instanceType;
- final HelixAdmin _helixAdmin;
- final ZKHelixDataAccessor _dataAccessor;
- final DefaultMessagingService _messagingService;
- final StateMachineEngine _stateMachineEngine;
- final LiveInstanceInfoProvider _liveInstanceInfoProvider;
-
- public ParticipantManagerHelper(HelixManager manager, ZkClient zkclient, int sessionTimeout,
- LiveInstanceInfoProvider liveInstanceInfoProvider) {
- _zkclient = zkclient;
- _manager = manager;
- _clusterName = manager.getClusterName();
- _instanceName = manager.getInstanceName();
- _keyBuilder = new PropertyKey.Builder(_clusterName);
- _sessionId = manager.getSessionId();
- _sessionTimeout = sessionTimeout;
- _configAccessor = manager.getConfigAccessor();
- _instanceType = manager.getInstanceType();
- _helixAdmin = manager.getClusterManagmentTool();
- _dataAccessor = (ZKHelixDataAccessor) manager.getHelixDataAccessor();
- _messagingService = (DefaultMessagingService) manager.getMessagingService();
- _stateMachineEngine = manager.getStateMachineEngine();
- _liveInstanceInfoProvider = liveInstanceInfoProvider;
- }
-
- public void joinCluster() {
- // Read cluster config and see if instance can auto join the cluster
- boolean autoJoin = false;
- try {
- HelixConfigScope scope =
- new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(
- _manager.getClusterName()).build();
- autoJoin =
- Boolean.parseBoolean(_configAccessor.get(scope,
- ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN));
- LOG.info("instance: " + _instanceName + " auto-joining " + _clusterName + " is " + autoJoin);
- } catch (Exception e) {
- // autoJoin is false
- }
-
- if (!ZKUtil.isInstanceSetup(_zkclient, _clusterName, _instanceName, _instanceType)) {
- if (!autoJoin) {
- throw new HelixException("Initial cluster structure is not set up for instance: "
- + _instanceName + ", instanceType: " + _instanceType);
- } else {
- LOG.info(_instanceName + " is auto-joining cluster: " + _clusterName);
- InstanceConfig instanceConfig = new InstanceConfig(_instanceName);
- String hostName = _instanceName;
- String port = "";
- int lastPos = _instanceName.lastIndexOf("_");
- if (lastPos > 0) {
- hostName = _instanceName.substring(0, lastPos);
- port = _instanceName.substring(lastPos + 1);
- }
- instanceConfig.setHostName(hostName);
- instanceConfig.setPort(port);
- instanceConfig.setInstanceEnabled(true);
- _helixAdmin.addInstance(_clusterName, instanceConfig);
- }
- }
- }
-
- public void createLiveInstance() {
- String liveInstancePath = _keyBuilder.liveInstance(_instanceName).getPath();
- LiveInstance liveInstance = new LiveInstance(_instanceName);
- liveInstance.setSessionId(_sessionId);
- liveInstance.setHelixVersion(_manager.getVersion());
- liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
-
- // LiveInstanceInfoProvider liveInstanceInfoProvider = _manager._liveInstanceInfoProvider;
- if (_liveInstanceInfoProvider != null) {
- LOG.info("invoke liveInstanceInfoProvider");
- ZNRecord additionalLiveInstanceInfo =
- _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo();
- if (additionalLiveInstanceInfo != null) {
- additionalLiveInstanceInfo.merge(liveInstance.getRecord());
- ZNRecord mergedLiveInstance = new ZNRecord(additionalLiveInstanceInfo, _instanceName);
- liveInstance = new LiveInstance(mergedLiveInstance);
- LOG.info("instanceName: " + _instanceName + ", mergedLiveInstance: " + liveInstance);
- }
- }
-
- boolean retry;
- do {
- retry = false;
- try {
- _zkclient.createEphemeral(liveInstancePath, liveInstance.getRecord());
- } catch (ZkNodeExistsException e) {
- LOG.warn("found another instance with same instanceName: " + _instanceName + " in cluster "
- + _clusterName);
-
- Stat stat = new Stat();
- ZNRecord record = _zkclient.readData(liveInstancePath, stat, true);
- if (record == null) {
- /**
- * live-instance is gone as we check it, retry create live-instance
- */
- retry = true;
- } else {
- String ephemeralOwner = Long.toHexString(stat.getEphemeralOwner());
- if (ephemeralOwner.equals(_sessionId)) {
- /**
- * update sessionId field in live-instance if necessary
- */
- LiveInstance curLiveInstance = new LiveInstance(record);
- if (!curLiveInstance.getTypedSessionId().stringify().equals(_sessionId)) {
- /**
- * in last handle-new-session,
- * live-instance is created by new zkconnection with stale session-id inside
- * just update session-id field
- */
- LOG.info("overwriting session-id by ephemeralOwner: " + ephemeralOwner
- + ", old-sessionId: " + curLiveInstance.getTypedSessionId() + ", new-sessionId: "
- + _sessionId);
-
- curLiveInstance.setSessionId(_sessionId);
- _zkclient.writeData(liveInstancePath, curLiveInstance.getRecord());
- }
- } else {
- /**
- * wait for a while, in case previous helix-participant exits unexpectedly
- * and its live-instance still hangs around until session timeout
- */
- try {
- TimeUnit.MILLISECONDS.sleep(_sessionTimeout + 5000);
- } catch (InterruptedException ex) {
- LOG.warn("Sleep interrupted while waiting for previous live-instance to go away.", ex);
- }
- /**
- * give a last try after exit while loop
- */
- retry = true;
- break;
- }
- }
- }
- } while (retry);
-
- /**
- * give a last shot
- */
- if (retry) {
- try {
- _zkclient.createEphemeral(liveInstancePath, liveInstance.getRecord());
- } catch (Exception e) {
- String errorMessage =
- "instance: " + _instanceName + " already has a live-instance in cluster "
- + _clusterName;
- LOG.error(errorMessage);
- throw new HelixException(errorMessage);
- }
- }
- }
-
- /**
- * carry over current-states from last sessions
- * set to initial state for current session only when state doesn't exist in current session
- */
- public void carryOverPreviousCurrentState() {
- List<String> sessions = _dataAccessor.getChildNames(_keyBuilder.sessions(_instanceName));
-
- for (String session : sessions) {
- if (session.equals(_sessionId)) {
- continue;
- }
-
- List<CurrentState> lastCurStates =
- _dataAccessor.getChildValues(_keyBuilder.currentStates(_instanceName, session));
-
- for (CurrentState lastCurState : lastCurStates) {
- LOG.info("Carrying over old session: " + session + ", resource: " + lastCurState.getId()
- + " to current session: " + _sessionId);
- String stateModelDefRef = lastCurState.getStateModelDefRef();
- if (stateModelDefRef == null) {
- LOG.error("skip carry-over because previous current state doesn't have a state model definition. previous current-state: "
- + lastCurState);
- continue;
- }
- StateModelDefinition stateModel =
- _dataAccessor.getProperty(_keyBuilder.stateModelDef(stateModelDefRef));
-
- String curStatePath =
- _keyBuilder.currentState(_instanceName, _sessionId, lastCurState.getResourceName())
- .getPath();
- _dataAccessor.getBaseDataAccessor().update(curStatePath,
- new CurStateCarryOverUpdater(_sessionId, stateModel.getInitialState(), lastCurState),
- AccessOption.PERSISTENT);
- }
- }
-
- /**
- * remove previous current state parent nodes
- */
- for (String session : sessions) {
- if (session.equals(_sessionId)) {
- continue;
- }
-
- String path = _keyBuilder.currentStates(_instanceName, session).getPath();
- LOG.info("Removing current states from previous sessions. path: " + path);
- _zkclient.deleteRecursive(path);
- }
- }
-
- public void setupMsgHandler() throws Exception {
- _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
- _stateMachineEngine);
- _manager.addMessageListener(_messagingService.getExecutor(), _instanceName);
-
- ScheduledTaskStateModelFactory stStateModelFactory =
- new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
- _stateMachineEngine.registerStateModelFactory(
- DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, stStateModelFactory);
- _messagingService.onConnected();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
index af50eb7..81810de 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
@@ -44,6 +44,7 @@ import org.apache.helix.api.config.ParticipantConfig;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.Id;
import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.HelixConfigScope;
@@ -348,8 +349,8 @@ public class ZkHelixParticipant implements HelixParticipant {
ScheduledTaskStateModelFactory stStateModelFactory =
new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
- _stateMachineEngine.registerStateModelFactory(
- DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, stStateModelFactory);
+ _stateMachineEngine.registerStateModelFactory(StateModelDefId.SchedulerTaskQueue,
+ stStateModelFactory);
_messagingService.onConnected();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 1bb6506..4ede2a4 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -43,14 +43,14 @@ import org.apache.helix.ZNRecordBucketizer;
import org.apache.helix.ZNRecordDelta;
import org.apache.helix.ZNRecordDelta.MergeOperation;
import org.apache.helix.api.State;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SessionId;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.Attributes;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelParser;
import org.apache.helix.participant.statemachine.StateTransitionError;
import org.apache.helix.util.StatusUpdateUtil;
@@ -66,16 +66,16 @@ public class HelixStateTransitionHandler extends MessageHandler {
}
private static final Logger logger = Logger.getLogger(HelixStateTransitionHandler.class);
- private final StateModel _stateModel;
+ private final TransitionHandler _stateModel;
StatusUpdateUtil _statusUpdateUtil;
private final StateModelParser _transitionMethodFinder;
private final CurrentState _currentStateDelta;
private final HelixManager _manager;
- private final StateModelFactory<? extends StateModel> _stateModelFactory;
+ private final StateTransitionHandlerFactory<? extends TransitionHandler> _stateModelFactory;
volatile boolean _isTimeout = false;
- public HelixStateTransitionHandler(StateModelFactory<? extends StateModel> stateModelFactory,
- StateModel stateModel, Message message, NotificationContext context,
+ public HelixStateTransitionHandler(StateTransitionHandlerFactory<? extends TransitionHandler> stateModelFactory,
+ TransitionHandler stateModel, Message message, NotificationContext context,
CurrentState currentStateDelta) {
super(message, context);
_stateModel = stateModel;
@@ -206,7 +206,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
deltaList.add(delta);
_currentStateDelta.setDeltaList(deltaList);
- _stateModelFactory.removeStateModel(partitionId.stringify());
+ _stateModelFactory.removeTransitionHandler(partitionId);
} else {
// if the partition is not to be dropped, update _stateModel to the TO_STATE
_stateModel.updateState(toState.toString());
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java b/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
index 6c96629..17adf7f 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
@@ -29,6 +29,7 @@ import org.apache.helix.LiveInstanceChangeListener;
import org.apache.helix.NotificationContext;
import org.apache.helix.NotificationContext.Type;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.id.PartitionId;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
@@ -39,9 +40,9 @@ public class CustomCodeInvoker implements LiveInstanceChangeListener, InstanceCo
ExternalViewChangeListener {
private static Logger LOG = Logger.getLogger(CustomCodeInvoker.class);
private final CustomCodeCallbackHandler _callback;
- private final String _partitionKey;
+ private final PartitionId _partitionKey;
- public CustomCodeInvoker(CustomCodeCallbackHandler callback, String partitionKey) {
+ public CustomCodeInvoker(CustomCodeCallbackHandler callback, PartitionId partitionKey) {
_callback = callback;
_partitionKey = partitionKey;
}
@@ -60,7 +61,7 @@ public class CustomCodeInvoker implements LiveInstanceChangeListener, InstanceCo
String sessionId = manager.getSessionId();
// get resource name from partition key: "PARTICIPANT_LEADER_XXX_0"
- String resourceName = _partitionKey.substring(0, _partitionKey.lastIndexOf('_'));
+ String resourceName = _partitionKey.stringify().substring(0, _partitionKey.stringify().lastIndexOf('_'));
CurrentState curState =
accessor.getProperty(keyBuilder.currentState(instance, sessionId, resourceName));
@@ -68,7 +69,7 @@ public class CustomCodeInvoker implements LiveInstanceChangeListener, InstanceCo
return;
}
- String state = curState.getState(_partitionKey);
+ String state = curState.getState(_partitionKey.stringify());
if (state == null || !state.equalsIgnoreCase("LEADER")) {
return;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
index 0c2eb7c..4579810 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
@@ -23,8 +23,8 @@ import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.StateModelParser;
import org.apache.helix.participant.statemachine.StateTransitionError;
@@ -34,7 +34,7 @@ import org.apache.log4j.Logger;
@StateModelInfo(initialState = "OFFLINE", states = {
"LEADER", "STANDBY"
})
-public class DistClusterControllerStateModel extends StateModel {
+public class DistClusterControllerStateModel extends TransitionHandler {
private static Logger logger = Logger.getLogger(DistClusterControllerStateModel.class);
private HelixManager _controller = null;
private final String _zkAddr;
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
index a367c81..5f21e90 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
@@ -1,5 +1,8 @@
package org.apache.helix.participant;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.id.PartitionId;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,10 +22,9 @@ package org.apache.helix.participant;
* under the License.
*/
-import org.apache.helix.participant.statemachine.StateModelFactory;
public class DistClusterControllerStateModelFactory extends
- StateModelFactory<DistClusterControllerStateModel> {
+ StateTransitionHandlerFactory<DistClusterControllerStateModel> {
private final String _zkAddr;
public DistClusterControllerStateModelFactory(String zkAddr) {
@@ -30,7 +32,7 @@ public class DistClusterControllerStateModelFactory extends
}
@Override
- public DistClusterControllerStateModel createNewStateModel(String stateUnitKey) {
+ public DistClusterControllerStateModel createStateTransitionHandler(PartitionId partition) {
return new DistClusterControllerStateModel(_zkAddr);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
index 3866cf5..ba23a43 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
@@ -25,8 +25,9 @@ import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.log4j.Logger;
@@ -34,14 +35,14 @@ import org.apache.log4j.Logger;
@StateModelInfo(initialState = "OFFLINE", states = {
"LEADER", "STANDBY"
})
-public class GenericLeaderStandbyModel extends StateModel {
+public class GenericLeaderStandbyModel extends TransitionHandler {
private static Logger LOG = Logger.getLogger(GenericLeaderStandbyModel.class);
private final CustomCodeInvoker _particHolder;
private final List<ChangeType> _notificationTypes;
public GenericLeaderStandbyModel(CustomCodeCallbackHandler callback,
- List<ChangeType> notificationTypes, String partitionKey) {
+ List<ChangeType> notificationTypes, PartitionId partitionKey) {
_particHolder = new CustomCodeInvoker(callback, partitionKey);
_notificationTypes = notificationTypes;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
index 51c91cc..c5d7390 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
@@ -22,10 +22,11 @@ package org.apache.helix.participant;
import java.util.List;
import org.apache.helix.HelixConstants.ChangeType;
-import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.id.PartitionId;
public class GenericLeaderStandbyStateModelFactory extends
- StateModelFactory<GenericLeaderStandbyModel> {
+ StateTransitionHandlerFactory<GenericLeaderStandbyModel> {
private final CustomCodeCallbackHandler _callback;
private final List<ChangeType> _notificationTypes;
@@ -39,7 +40,7 @@ public class GenericLeaderStandbyStateModelFactory extends
}
@Override
- public GenericLeaderStandbyModel createNewStateModel(String partitionKey) {
- return new GenericLeaderStandbyModel(_callback, _notificationTypes, partitionKey);
+ public GenericLeaderStandbyModel createStateTransitionHandler(PartitionId partition) {
+ return new GenericLeaderStandbyModel(_callback, _notificationTypes, partition);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
index 2f169cc..97b266e 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
@@ -55,7 +55,6 @@ import org.apache.log4j.Logger;
* </code>
*/
public class HelixCustomCodeRunner {
- private static final String LEADER_STANDBY = "LeaderStandby";
private static Logger LOG = Logger.getLogger(HelixCustomCodeRunner.class);
private static String PARTICIPANT_LEADER = "PARTICIPANT_LEADER";
@@ -130,7 +129,8 @@ public class HelixCustomCodeRunner {
_stateModelFty = new GenericLeaderStandbyStateModelFactory(_callback, _notificationTypes);
StateMachineEngine stateMach = _manager.getStateMachineEngine();
- stateMach.registerStateModelFactory(LEADER_STANDBY, _stateModelFty, _resourceName);
+ stateMach.registerStateModelFactory(StateModelDefId.LeaderStandby, _resourceName,
+ _stateModelFty);
ZkClient zkClient = null;
try {
// manually add ideal state for participant leader using LeaderStandby
@@ -148,7 +148,7 @@ public class HelixCustomCodeRunner {
idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
idealState.setReplicas(StateModelToken.ANY_LIVEINSTANCE.toString());
idealState.setNumPartitions(1);
- idealState.setStateModelDefId(StateModelDefId.from(LEADER_STANDBY));
+ idealState.setStateModelDefId(StateModelDefId.LeaderStandby);
idealState.setStateModelFactoryId(StateModelFactoryId.from(_resourceName));
List<String> prefList =
new ArrayList<String>(Arrays.asList(StateModelToken.ANY_LIVEINSTANCE.toString()));
@@ -177,7 +177,7 @@ public class HelixCustomCodeRunner {
*/
public void stop() {
LOG.info("Removing stateModelFactory for " + _resourceName);
- _manager.getStateMachineEngine().removeStateModelFactory(LEADER_STANDBY, _stateModelFty,
+ _manager.getStateMachineEngine().removeStateModelFactory(StateModelDefId.LeaderStandby,
_resourceName);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index 95afb70..56769b5 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -32,6 +32,8 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.NotificationContext.MapKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.api.State;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.api.id.MessageId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
@@ -46,10 +48,6 @@ import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
-import org.apache.helix.participant.statemachine.HelixStateModelFactoryAdaptor;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelParser;
import org.apache.log4j.Logger;
@@ -58,27 +56,26 @@ public class HelixStateMachineEngine implements StateMachineEngine {
/**
* Map of StateModelDefId to map of FactoryName to StateModelFactory
- * TODO change to use StateModelDefId and HelixStateModelFactory
*/
- private final Map<String, Map<String, StateModelFactory<? extends StateModel>>> _stateModelFactoryMap;
+ private final Map<StateModelDefId, Map<String, StateTransitionHandlerFactory<? extends TransitionHandler>>> _stateModelFactoryMap;
private final StateModelParser _stateModelParser;
private final HelixManager _manager;
- private final ConcurrentHashMap<String, StateModelDefinition> _stateModelDefs;
+ private final ConcurrentHashMap<StateModelDefId, StateModelDefinition> _stateModelDefs;
public HelixStateMachineEngine(HelixManager manager) {
_stateModelParser = new StateModelParser();
_manager = manager;
_stateModelFactoryMap =
- new ConcurrentHashMap<String, Map<String, StateModelFactory<? extends StateModel>>>();
- _stateModelDefs = new ConcurrentHashMap<String, StateModelDefinition>();
+ new ConcurrentHashMap<StateModelDefId, Map<String, StateTransitionHandlerFactory<? extends TransitionHandler>>>();
+ _stateModelDefs = new ConcurrentHashMap<StateModelDefId, StateModelDefinition>();
}
- public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName) {
+ public StateTransitionHandlerFactory<? extends TransitionHandler> getStateModelFactory(StateModelDefId stateModelName) {
return getStateModelFactory(stateModelName, HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
}
- public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName,
+ public StateTransitionHandlerFactory<? extends TransitionHandler> getStateModelFactory(StateModelDefId stateModelName,
String factoryName) {
if (!_stateModelFactoryMap.containsKey(stateModelName)) {
return null;
@@ -86,39 +83,6 @@ public class HelixStateMachineEngine implements StateMachineEngine {
return _stateModelFactoryMap.get(stateModelName).get(factoryName);
}
- @Override
- public boolean registerStateModelFactory(String stateModelDef,
- StateModelFactory<? extends StateModel> factory) {
- return registerStateModelFactory(stateModelDef, factory,
- HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
- }
-
- @Override
- public boolean registerStateModelFactory(String stateModelName,
- StateModelFactory<? extends StateModel> factory, String factoryName) {
- if (stateModelName == null || factory == null || factoryName == null) {
- throw new HelixException("stateModelDef|stateModelFactory|factoryName cannot be null");
- }
-
- LOG.info("Register state model factory for state model " + stateModelName
- + " using factory name " + factoryName + " with " + factory);
-
- if (!_stateModelFactoryMap.containsKey(stateModelName)) {
- _stateModelFactoryMap.put(stateModelName,
- new ConcurrentHashMap<String, StateModelFactory<? extends StateModel>>());
- }
-
- if (_stateModelFactoryMap.get(stateModelName).containsKey(factoryName)) {
- LOG.warn("stateModelFactory for " + stateModelName + " using factoryName " + factoryName
- + " has already been registered.");
- return false;
- }
-
- _stateModelFactoryMap.get(stateModelName).put(factoryName, factory);
- sendNopMessage();
- return true;
- }
-
// TODO: duplicated code in DefaultMessagingService
private void sendNopMessage() {
if (_manager.isConnected()) {
@@ -150,11 +114,11 @@ public class HelixStateMachineEngine implements StateMachineEngine {
@Override
public void reset() {
- for (Map<String, StateModelFactory<? extends StateModel>> ftyMap : _stateModelFactoryMap
+ for (Map<String, StateTransitionHandlerFactory<? extends TransitionHandler>> ftyMap : _stateModelFactoryMap
.values()) {
- for (StateModelFactory<? extends StateModel> stateModelFactory : ftyMap.values()) {
- for (String resourceKey : stateModelFactory.getPartitionSet()) {
- StateModel stateModel = stateModelFactory.getStateModel(resourceKey);
+ for (StateTransitionHandlerFactory<? extends TransitionHandler> stateModelFactory : ftyMap.values()) {
+ for (PartitionId partition : stateModelFactory.getPartitionSet()) {
+ TransitionHandler stateModel = stateModelFactory.getTransitionHandler(partition);
stateModel.reset();
String initialState = _stateModelParser.getInitialState(stateModel.getClass());
stateModel.updateState(initialState);
@@ -191,8 +155,8 @@ public class HelixStateMachineEngine implements StateMachineEngine {
factoryName = HelixConstants.DEFAULT_STATE_MODEL_FACTORY;
}
- StateModelFactory<? extends StateModel> stateModelFactory =
- getStateModelFactory(stateModelId.stringify(), factoryName);
+ StateTransitionHandlerFactory<? extends TransitionHandler> stateModelFactory =
+ getStateModelFactory(stateModelId, factoryName);
if (stateModelFactory == null) {
LOG.warn("Fail to create msg-handler because cannot find stateModelFactory for model: "
+ stateModelId + " using factoryName: " + factoryName + " for resource: " + resourceId);
@@ -200,7 +164,7 @@ public class HelixStateMachineEngine implements StateMachineEngine {
}
// check if the state model definition exists and cache it
- if (!_stateModelDefs.containsKey(stateModelId.stringify())) {
+ if (!_stateModelDefs.containsKey(stateModelId)) {
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
StateModelDefinition stateModelDef =
@@ -209,15 +173,15 @@ public class HelixStateMachineEngine implements StateMachineEngine {
throw new HelixException("fail to create msg-handler because stateModelDef for "
+ stateModelId + " does NOT exist");
}
- _stateModelDefs.put(stateModelId.stringify(), stateModelDef);
+ _stateModelDefs.put(stateModelId, stateModelDef);
}
if (message.getBatchMessageMode() == false) {
// create currentStateDelta for this partition
- String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState();
- StateModel stateModel = stateModelFactory.getStateModel(partitionKey.stringify());
+ String initState = _stateModelDefs.get(message.getStateModelDefId()).getInitialState();
+ TransitionHandler stateModel = stateModelFactory.getTransitionHandler(partitionKey);
if (stateModel == null) {
- stateModel = stateModelFactory.createAndAddStateModel(partitionKey.stringify());
+ stateModel = stateModelFactory.createAndAddSTransitionHandler(partitionKey);
stateModel.updateState(initState);
}
@@ -237,9 +201,9 @@ public class HelixStateMachineEngine implements StateMachineEngine {
currentStateDelta);
} else {
BatchMessageWrapper wrapper =
- stateModelFactory.getBatchMessageWrapper(resourceId.stringify());
+ stateModelFactory.getBatchMessageWrapper(resourceId);
if (wrapper == null) {
- wrapper = stateModelFactory.createAndAddBatchMessageWrapper(resourceId.stringify());
+ wrapper = stateModelFactory.createAndAddBatchMessageWrapper(resourceId);
}
// get executor-service for the message
@@ -259,27 +223,15 @@ public class HelixStateMachineEngine implements StateMachineEngine {
}
@Override
- public boolean removeStateModelFactory(String stateModelDef,
- StateModelFactory<? extends StateModel> factory) {
- throw new UnsupportedOperationException("Remove not yet supported");
- }
-
- @Override
- public boolean removeStateModelFactory(String stateModelDef,
- StateModelFactory<? extends StateModel> factory, String factoryName) {
- throw new UnsupportedOperationException("Remove not yet supported");
- }
-
- @Override
public boolean registerStateModelFactory(StateModelDefId stateModelDefId,
- HelixStateModelFactory<? extends StateModel> factory) {
+ StateTransitionHandlerFactory<? extends TransitionHandler> factory) {
return registerStateModelFactory(stateModelDefId, HelixConstants.DEFAULT_STATE_MODEL_FACTORY,
factory);
}
@Override
public boolean registerStateModelFactory(StateModelDefId stateModelDefId, String factoryName,
- HelixStateModelFactory<? extends StateModel> factory) {
+ StateTransitionHandlerFactory<? extends TransitionHandler> factory) {
if (stateModelDefId == null || factoryName == null || factory == null) {
LOG.info("stateModelDefId|factoryName|stateModelFactory is null");
return false;
@@ -288,22 +240,18 @@ public class HelixStateMachineEngine implements StateMachineEngine {
LOG.info("Registering state model factory for state-model-definition: " + stateModelDefId
+ " using factory-name: " + factoryName + " with: " + factory);
- StateModelFactory<? extends StateModel> factoryAdaptor =
- new HelixStateModelFactoryAdaptor(factory);
-
- String stateModelDefName = stateModelDefId.stringify();
- if (!_stateModelFactoryMap.containsKey(stateModelDefName)) {
- _stateModelFactoryMap.put(stateModelDefName,
- new ConcurrentHashMap<String, StateModelFactory<? extends StateModel>>());
+ if (!_stateModelFactoryMap.containsKey(stateModelDefId)) {
+ _stateModelFactoryMap.put(stateModelDefId,
+ new ConcurrentHashMap<String, StateTransitionHandlerFactory<? extends TransitionHandler>>());
}
- if (_stateModelFactoryMap.get(stateModelDefName).containsKey(factoryName)) {
+ if (_stateModelFactoryMap.get(stateModelDefId).containsKey(factoryName)) {
LOG.info("Skip register state model factory for " + stateModelDefId + " using factory-name "
+ factoryName + ", since it has already been registered.");
return false;
}
- _stateModelFactoryMap.get(stateModelDefName).put(factoryName, factoryAdaptor);
+ _stateModelFactoryMap.get(stateModelDefId).put(factoryName, factory);
sendNopMessage();
return true;
@@ -324,15 +272,14 @@ public class HelixStateMachineEngine implements StateMachineEngine {
LOG.info("Removing state model factory for state-model-definition: " + stateModelDefId
+ " using factory-name: " + factoryName);
- String stateModelDefName = stateModelDefId.stringify();
- Map<String, StateModelFactory<? extends StateModel>> ftyMap =
- _stateModelFactoryMap.get(stateModelDefName);
+ Map<String, StateTransitionHandlerFactory<? extends TransitionHandler>> ftyMap =
+ _stateModelFactoryMap.get(stateModelDefId);
if (ftyMap == null) {
LOG.info("Skip remove state model factory " + stateModelDefId + ", since it does NOT exist");
return false;
}
- StateModelFactory<? extends StateModel> fty = ftyMap.remove(factoryName);
+ StateTransitionHandlerFactory<? extends TransitionHandler> fty = ftyMap.remove(factoryName);
if (fty == null) {
LOG.info("Skip remove state model factory " + stateModelDefId + " using factory-name "
+ factoryName + ", since it does NOT exist");
@@ -340,11 +287,11 @@ public class HelixStateMachineEngine implements StateMachineEngine {
}
if (ftyMap.isEmpty()) {
- _stateModelFactoryMap.remove(stateModelDefName);
+ _stateModelFactoryMap.remove(stateModelDefId);
}
- for (String partition : fty.getPartitionSet()) {
- StateModel stateModel = fty.getStateModel(partition);
+ for (PartitionId partition : fty.getPartitionSet()) {
+ TransitionHandler stateModel = fty.getTransitionHandler(partition);
stateModel.reset();
// TODO probably should remove the state from zookeeper
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
index abb7d81..48f06d3 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
@@ -19,11 +19,10 @@ package org.apache.helix.participant;
* under the License.
*/
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
/**
* Helix participant uses this class to register/remove state model factory
@@ -32,35 +31,6 @@ import org.apache.helix.participant.statemachine.StateModelFactory;
public interface StateMachineEngine extends MessageHandlerFactory {
/**
- * Replaced by {@link #registerStateModelFactory(StateModelDefId, HelixStateModelFactory)
-
- */
- @Deprecated
- public boolean registerStateModelFactory(String stateModelDef,
- StateModelFactory<? extends StateModel> factory);
-
- /**
- * Replaced by {@link #registerStateModelFactory(StateModelDefId, String, HelixStateModelFactory)}
- */
- @Deprecated
- public boolean registerStateModelFactory(String stateModelDef,
- StateModelFactory<? extends StateModel> factory, String factoryName);
-
- /**
- * Replaced by {@link #removeStateModelFactory(StateModelDefId, HelixStateModelFactory)}
- */
- @Deprecated
- public boolean removeStateModelFactory(String stateModelDef,
- StateModelFactory<? extends StateModel> factory);
-
- /**
- * Replaced by {@link #removeStateModelFactory(StateModelDefId, String, HelixStateModelFactory)}
- */
- @Deprecated
- public boolean removeStateModelFactory(String stateModelDef,
- StateModelFactory<? extends StateModel> factory, String factoryName);
-
- /**
* Register a default state model factory for a state model definition
* A state model definition could be, for example:
* "MasterSlave", "OnlineOffline", "LeaderStandby", etc.
@@ -70,7 +40,7 @@ public interface StateMachineEngine extends MessageHandlerFactory {
* @return
*/
public boolean registerStateModelFactory(StateModelDefId stateModelDefId,
- HelixStateModelFactory<? extends StateModel> factory);
+ StateTransitionHandlerFactory<? extends TransitionHandler> factory);
/**
* Register a state model factory with a factory name for a state model definition
@@ -81,7 +51,7 @@ public interface StateMachineEngine extends MessageHandlerFactory {
* @return
*/
public boolean registerStateModelFactory(StateModelDefId stateModelDefId, String factoryName,
- HelixStateModelFactory<? extends StateModel> factory);
+ StateTransitionHandlerFactory<? extends TransitionHandler> factory);
/**
* Remove the default state model factory for a state model definition