You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/08/07 22:46:38 UTC
[1/5] [HELIX-484] Remove CallbackHandler/ZkCallbackHandler code
duplication,
[HELIX-486] Remove StateModelFactory/HelixStateModelFactory code duplication,
rb=24332
Repository: helix
Updated Branches:
refs/heads/master 8ba16a8e2 -> d35e29ad0
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/Lock.java
----------------------------------------------------------------------
diff --git a/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/Lock.java b/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/Lock.java
index 2ca3153..a4c65d2 100644
--- a/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/Lock.java
+++ b/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/Lock.java
@@ -20,15 +20,15 @@ package org.apache.helix.lockmanager;
*/
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
@StateModelInfo(initialState = "OFFLINE", states = {
"OFFLINE", "ONLINE"
})
-public class Lock extends StateModel {
+public class Lock extends TransitionHandler {
private String lockName;
public Lock(String lockName) {
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockFactory.java
----------------------------------------------------------------------
diff --git a/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockFactory.java b/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockFactory.java
index ab423f4..27df916 100644
--- a/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockFactory.java
+++ b/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockFactory.java
@@ -19,12 +19,12 @@ package org.apache.helix.lockmanager;
* under the License.
*/
+import org.apache.helix.api.StateTransitionHandlerFactory;
import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
-public class LockFactory extends HelixStateModelFactory<Lock> {
+public class LockFactory extends StateTransitionHandlerFactory<Lock> {
@Override
- public Lock createNewStateModel(PartitionId lockName) {
+ public Lock createStateTransitionHandler(PartitionId lockName) {
return new Lock(lockName.toString());
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
index 0b164b3..de56171 100644
--- a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
@@ -52,10 +52,10 @@ public class Consumer {
InstanceType.PARTICIPANT, _zkAddr);
StateMachineEngine stateMach = _manager.getStateMachineEngine();
- ConsumerStateModelFactory modelFactory =
- new ConsumerStateModelFactory(_consumerId, _mqServer);
+ ConsumerStateTransitionHandlerFactory transitionHandlerFactory =
+ new ConsumerStateTransitionHandlerFactory(_consumerId, _mqServer);
stateMach.registerStateModelFactory(
- StateModelDefId.from(SetupConsumerCluster.DEFAULT_STATE_MODEL), modelFactory);
+ StateModelDefId.from(SetupConsumerCluster.DEFAULT_STATE_MODEL), transitionHandlerFactory);
_manager.connect();
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModel.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModel.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModel.java
index 24e4a40..b41ccf7 100644
--- a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModel.java
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModel.java
@@ -20,17 +20,16 @@ package org.apache.helix.recipes.rabbitmq;
*/
import org.apache.log4j.Logger;
-
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
@StateModelInfo(initialState = "OFFLINE", states = {
"ONLINE", "ERROR"
})
-public class ConsumerStateModel extends StateModel {
+public class ConsumerStateModel extends TransitionHandler {
private static Logger LOG = Logger.getLogger(ConsumerStateModel.class);
private final String _consumerId;
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java
deleted file mode 100644
index 98cce35..0000000
--- a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package org.apache.helix.recipes.rabbitmq;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
-
-public class ConsumerStateModelFactory extends HelixStateModelFactory<ConsumerStateModel> {
- private final String _consumerId;
- private final String _mqServer;
-
- public ConsumerStateModelFactory(String consumerId, String msServer) {
- _consumerId = consumerId;
- _mqServer = msServer;
- }
-
- @Override
- public ConsumerStateModel createNewStateModel(PartitionId partition) {
- ConsumerStateModel model =
- new ConsumerStateModel(_consumerId, partition.stringify(), _mqServer);
- return model;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateTransitionHandlerFactory.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateTransitionHandlerFactory.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateTransitionHandlerFactory.java
new file mode 100644
index 0000000..a0d11f8
--- /dev/null
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateTransitionHandlerFactory.java
@@ -0,0 +1,40 @@
+package org.apache.helix.recipes.rabbitmq;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.id.PartitionId;
+
+public class ConsumerStateTransitionHandlerFactory extends StateTransitionHandlerFactory<ConsumerStateModel> {
+ private final String _consumerId;
+ private final String _mqServer;
+
+ public ConsumerStateTransitionHandlerFactory(String consumerId, String msServer) {
+ _consumerId = consumerId;
+ _mqServer = msServer;
+ }
+
+ @Override
+ public ConsumerStateModel createStateTransitionHandler(PartitionId partition) {
+ ConsumerStateModel model =
+ new ConsumerStateModel(_consumerId, partition.stringify(), _mqServer);
+ return model;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModel.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModel.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModel.java
index 6eaf808..e41e52b 100644
--- a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModel.java
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModel.java
@@ -24,9 +24,9 @@ import org.apache.helix.AccessOption;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.ZNRecord;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -36,7 +36,7 @@ import org.apache.zookeeper.data.Stat;
@StateModelInfo(initialState = "OFFLINE", states = {
"OFFLINE", "MASTER", "SLAVE"
})
-public class FileStoreStateModel extends StateModel {
+public class FileStoreStateModel extends TransitionHandler {
private final class HighWaterMarkUpdater implements DataUpdater<ZNRecord> {
private final Message message;
private final ChangeRecord lastRecordProcessed;
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
index 7e1938c..b1b3e44 100644
--- a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
@@ -20,10 +20,10 @@ package org.apache.helix.filestore;
*/
import org.apache.helix.HelixManager;
+import org.apache.helix.api.StateTransitionHandlerFactory;
import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
-public class FileStoreStateModelFactory extends HelixStateModelFactory<FileStoreStateModel> {
+public class FileStoreStateModelFactory extends StateTransitionHandlerFactory<FileStoreStateModel> {
private final HelixManager manager;
public FileStoreStateModelFactory(HelixManager manager) {
@@ -31,7 +31,7 @@ public class FileStoreStateModelFactory extends HelixStateModelFactory<FileStore
}
@Override
- public FileStoreStateModel createNewStateModel(PartitionId partition) {
+ public FileStoreStateModel createStateTransitionHandler(PartitionId partition) {
FileStoreStateModel model;
model =
new FileStoreStateModel(manager, partition.toString().split("_")[0], partition.toString());
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModel.java b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModel.java
index 3c1cab4..025af38 100644
--- a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModel.java
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModel.java
@@ -24,12 +24,12 @@ import java.util.Set;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.Message;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.log4j.Logger;
@@ -37,7 +37,7 @@ import org.apache.log4j.Logger;
@StateModelInfo(initialState = "OFFLINE", states = {
"ONLINE", "ERROR"
})
-public class TaskStateModel extends StateModel {
+public class TaskStateModel extends TransitionHandler {
private static Logger LOG = Logger.getLogger(TaskStateModel.class);
private final String _workerId;
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java
index 6948237..9d6b978 100644
--- a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java
@@ -19,10 +19,10 @@ package org.apache.helix.taskexecution;
* under the License.
*/
+import org.apache.helix.api.StateTransitionHandlerFactory;
import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
-public class TaskStateModelFactory extends HelixStateModelFactory<TaskStateModel> {
+public class TaskStateModelFactory extends StateTransitionHandlerFactory<TaskStateModel> {
private final String _workerId;
private final TaskFactory _taskFactory;
private TaskResultStore _taskResultStore;
@@ -35,7 +35,7 @@ public class TaskStateModelFactory extends HelixStateModelFactory<TaskStateModel
}
@Override
- public TaskStateModel createNewStateModel(PartitionId partition) {
+ public TaskStateModel createStateTransitionHandler(PartitionId partition) {
TaskStateModel model =
new TaskStateModel(_workerId, partition.toString(), _taskFactory, _taskResultStore);
return model;
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/Lock.java
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/Lock.java b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/Lock.java
index 308ae14..a185363 100644
--- a/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/Lock.java
+++ b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/Lock.java
@@ -20,15 +20,15 @@ package org.apache.helix.userdefinedrebalancer;
*/
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
@StateModelInfo(initialState = "RELEASED", states = {
"RELEASED", "LOCKED"
})
-public class Lock extends StateModel {
+public class Lock extends TransitionHandler {
private String lockName;
public Lock(String lockName) {
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockFactory.java
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockFactory.java b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockFactory.java
index c607b1b..de0d5aa 100644
--- a/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockFactory.java
+++ b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockFactory.java
@@ -19,17 +19,17 @@ package org.apache.helix.userdefinedrebalancer;
* under the License.
*/
+import org.apache.helix.api.StateTransitionHandlerFactory;
import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
/**
* This factory allows a participant to get the appropriate state model callbacks for the lock
* manager state model. This is used exactly once per participant to get a valid instance of a Lock,
* and then the same Lock instance is used for all state transition callbacks.
*/
-public class LockFactory extends HelixStateModelFactory<Lock> {
+public class LockFactory extends StateTransitionHandlerFactory<Lock> {
@Override
- public Lock createNewStateModel(PartitionId partitionId) {
+ public Lock createStateTransitionHandler(PartitionId partitionId) {
// TODO Auto-generated method stub
return new Lock(partitionId.stringify());
}
[5/5] git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/helix
Posted by zz...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/helix
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d35e29ad
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d35e29ad
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d35e29ad
Branch: refs/heads/master
Commit: d35e29ad0236af9cfbc476060effb3975a83d191
Parents: 4ae1ff7 8ba16a8
Author: zzhang <zz...@apache.org>
Authored: Thu Aug 7 13:46:13 2014 -0700
Committer: zzhang <zz...@apache.org>
Committed: Thu Aug 7 13:46:13 2014 -0700
----------------------------------------------------------------------
.../integration/TestPreferenceListAsQueue.java | 171 +++++++++++--------
1 file changed, 97 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/d35e29ad/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
----------------------------------------------------------------------
[2/5] [HELIX-484] Remove CallbackHandler/ZkCallbackHandler code
duplication,
[HELIX-486] Remove StateModelFactory/HelixStateModelFactory code duplication,
rb=24332
Posted by zz...@apache.org.
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
index c4304b0..b929d5e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
@@ -30,8 +30,10 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.api.State;
+import org.apache.helix.api.StateTransitionHandlerFactory;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.manager.zk.MockParticipant;
import org.apache.helix.manager.zk.MockController;
import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
@@ -41,7 +43,6 @@ import org.apache.helix.mock.participant.SleepTransition;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.StateTransitionError;
import org.apache.helix.participant.statemachine.Transition;
@@ -146,26 +147,26 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
}
}
- public static class SleepStateModelFactory extends StateModelFactory<TimeOutStateModel> {
- Set<String> partitionsToSleep = new HashSet<String>();
+ public static class SleepStateModelFactory extends StateTransitionHandlerFactory<TimeOutStateModel> {
+ Set<PartitionId> partitionsToSleep = new HashSet<PartitionId>();
int _sleepTime;
public SleepStateModelFactory(int sleepTime) {
_sleepTime = sleepTime;
}
- public void setPartitions(Collection<String> partitions) {
+ public void setPartitions(Collection<PartitionId> partitions) {
partitionsToSleep.addAll(partitions);
}
- public void addPartition(String partition) {
+ public void addPartition(PartitionId partition) {
partitionsToSleep.add(partition);
}
@Override
- public TimeOutStateModel createNewStateModel(String stateUnitKey) {
+ public TimeOutStateModel createStateTransitionHandler(PartitionId partition) {
return new TimeOutStateModel(new SleepTransition(_sleepTime),
- partitionsToSleep.contains(stateUnitKey));
+ partitionsToSleep.contains(partition));
}
}
@@ -181,12 +182,12 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
factories.put(instanceName, factory);
for (PartitionId p : idealState.getPartitionIdSet()) {
if (idealState.getPreferenceList(p).get(0).equals(ParticipantId.from(instanceName))) {
- factory.addPartition(p.stringify());
+ factory.addPartition(p);
}
}
_participants[i] = new MockParticipant(_zkaddr, CLUSTER_NAME, instanceName);
- _participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave", factory);
+ _participants[i].getStateMachineEngine().registerStateModelFactory(StateModelDefId.from("MasterSlave"), factory);
_participants[i].syncStart();
}
String controllerName = "controller_0";
@@ -205,7 +206,7 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
ParticipantId idealMaster = idealState.getPreferenceList(p).get(0);
Assert.assertTrue(ev.getStateMap(p).get(idealMaster).equals(State.from("ERROR")));
- TimeOutStateModel model = factories.get(idealMaster.stringify()).getStateModel(p.stringify());
+ TimeOutStateModel model = factories.get(idealMaster.stringify()).getTransitionHandler(p);
Assert.assertEquals(model._errorCallcount, 1);
Assert.assertEquals(model._error.getCode(), ErrorCode.TIMEOUT);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
index 18f6fd7..08a2f18 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
@@ -32,14 +32,15 @@ import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
import org.apache.helix.api.State;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.api.id.StateModelFactoryId;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.testutil.ZkTestUtil;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -86,13 +87,13 @@ public class TestZkReconnect {
LOG.info("Register state machine");
final CountDownLatch latch = new CountDownLatch(1);
- participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
- new StateModelFactory<StateModel>() {
+ participant.getStateMachineEngine().registerStateModelFactory(
+ StateModelDefId.OnlineOffline, "test", new StateTransitionHandlerFactory<TransitionHandler>() {
@Override
- public StateModel createNewStateModel(String stateUnitKey) {
+ public TransitionHandler createStateTransitionHandler(PartitionId stateUnitKey) {
return new SimpleStateModel(latch);
}
- }, "test");
+ });
String resourceName = "test-resource";
LOG.info("Ideal state assignment");
@@ -144,7 +145,7 @@ public class TestZkReconnect {
}
}
- public static final class SimpleStateModel extends StateModel {
+ public static final class SimpleStateModel extends TransitionHandler {
private final CountDownLatch latch;
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
index 99986ef..1b55af5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
@@ -28,6 +28,7 @@ import org.apache.helix.PreConnectCallback;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.manager.zk.MockParticipant;
import org.apache.helix.manager.zk.MockMultiClusterController;
import org.apache.helix.manager.zk.MockController;
@@ -186,7 +187,7 @@ public class TestConsecutiveZkSessionExpiry extends ZkTestBase {
for (int i = 0; i < n; i++) {
String contrllerName = "localhost_" + (12918 + i);
distributedControllers[i] = new MockMultiClusterController(_zkaddr, clusterName, contrllerName);
- distributedControllers[i].getStateMachineEngine().registerStateModelFactory("MasterSlave",
+ distributedControllers[i].getStateMachineEngine().registerStateModelFactory(StateModelDefId.MasterSlave,
new MockMSModelFactory());
if (i == 0) {
distributedControllers[i].addPreConnectCallback(new PreConnectTestCallback(contrllerName,
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
deleted file mode 100644
index 18234b5..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
+++ /dev/null
@@ -1,203 +0,0 @@
-package org.apache.helix.integration.manager;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Date;
-import java.util.List;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZkTestHelper;
-import org.apache.helix.manager.zk.MockMultiClusterController;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.manager.zk.ZkCallbackHandler;
-import org.apache.helix.mock.participant.MockMSModelFactory;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.testutil.ZkTestBase;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestDistributedControllerManager extends ZkTestBase {
- private static Logger LOG = Logger.getLogger(TestDistributedControllerManager.class);
-
- @Test
- public void simpleIntegrationTest() throws Exception {
- // Logger.getRootLogger().setLevel(Level.INFO);
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
- int n = 2;
-
- System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
- TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 4, // partitions per resource
- n, // number of nodes
- 2, // replicas
- "MasterSlave", true); // do rebalance
-
- HelixManager[] distributedControllers = new HelixManager[n];
- for (int i = 0; i < n; i++) {
- int port = 12918 + i;
- distributedControllers[i] =
- new ZKHelixManager(clusterName, "localhost_" + port, InstanceType.CONTROLLER_PARTICIPANT,
- _zkaddr);
- distributedControllers[i].getStateMachineEngine().registerStateModelFactory("MasterSlave",
- new MockMSModelFactory());
- distributedControllers[i].connect();
- }
-
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(_zkaddr,
- clusterName));
- Assert.assertTrue(result);
-
- // disconnect first distributed-controller, and verify second takes leadership
- distributedControllers[0].disconnect();
-
- // verify leader changes to localhost_12919
- Thread.sleep(100);
- result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(_zkaddr,
- clusterName));
- Assert.assertTrue(result);
-
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, _baseAccessor);
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
- Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance("localhost_12918")));
- LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
- Assert.assertNotNull(leader);
- Assert.assertEquals(leader.getId(), "localhost_12919");
-
- // clean up
- distributedControllers[1].disconnect();
- Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance("localhost_12919")));
- Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
-
- System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- }
-
- /**
- * expire a controller and make sure the other takes the leadership
- * @param expireController
- * @param newController
- * @throws Exception
- */
- void expireController(MockMultiClusterController expireController,
- MockMultiClusterController newController) throws Exception {
- String clusterName = expireController.getClusterName();
- LOG.info("Expiring distributedController: " + expireController.getInstanceName()
- + ", session: " + expireController.getSessionId() + " ...");
- String oldSessionId = expireController.getSessionId();
-
- ZkTestHelper.expireSession(expireController.getZkClient());
- String newSessionId = expireController.getSessionId();
- LOG.debug("Expried distributedController: " + expireController.getInstanceName()
- + ", oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
-
- boolean result =
- ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
- _zkaddr, clusterName));
- Assert.assertTrue(result);
-
- // verify leader changes to localhost_12919
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, _baseAccessor);
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
- Assert.assertNotNull(accessor.getProperty(keyBuilder.liveInstance(expireController
- .getInstanceName())));
- LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
- Assert.assertNotNull(leader);
- Assert.assertEquals(leader.getId(), newController.getInstanceName());
-
- // check expired-controller has 2 handlers: message and leader-election
- TestHelper.printHandlers(expireController, expireController.getHandlers());
-
- List<ZkCallbackHandler> handlers = expireController.getHandlers();
- Assert.assertEquals(handlers.size(), 2,
- "Distributed controller should have 2 handler (message and leader-election) after lose leadership, but was "
- + handlers.size());
- }
-
- @Test
- public void simpleSessionExpiryTest() throws Exception {
- // Logger.getRootLogger().setLevel(Level.INFO);
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
- int n = 2;
-
- System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
- TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 4, // partitions per resource
- n, // number of nodes
- 2, // replicas
- "MasterSlave", true); // do rebalance
-
- MockMultiClusterController[] distributedControllers = new MockMultiClusterController[n];
-
- for (int i = 0; i < n; i++) {
- String contrllerName = "localhost_" + (12918 + i);
- distributedControllers[i] =
- new MockMultiClusterController(_zkaddr, clusterName, contrllerName);
- distributedControllers[i].getStateMachineEngine().registerStateModelFactory("MasterSlave",
- new MockMSModelFactory());
- distributedControllers[i].connect();
- }
-
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(_zkaddr,
- clusterName));
- Assert.assertTrue(result);
-
- // expire localhost_12918
- expireController(distributedControllers[0], distributedControllers[1]);
-
- // expire localhost_12919
- expireController(distributedControllers[1], distributedControllers[0]);
-
- // clean up
- ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-
- for (int i = 0; i < n; i++) {
- distributedControllers[i].disconnect();
- Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance(distributedControllers[i]
- .getInstanceName())));
- }
- Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
-
- System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/manager/TestHelixMultiClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestHelixMultiClusterController.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestHelixMultiClusterController.java
new file mode 100644
index 0000000..51e7e19
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestHelixMultiClusterController.java
@@ -0,0 +1,204 @@
+package org.apache.helix.integration.manager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.manager.zk.MockMultiClusterController;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZkCallbackHandler;
+import org.apache.helix.mock.participant.MockMSModelFactory;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.testutil.ZkTestBase;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestHelixMultiClusterController extends ZkTestBase {
+ private static Logger LOG = Logger.getLogger(TestHelixMultiClusterController.class);
+
+ @Test
+ public void simpleIntegrationTest() throws Exception {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 2;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 4, // partitions per resource
+ n, // number of nodes
+ 2, // replicas
+ "MasterSlave", true); // do rebalance
+
+ HelixManager[] distributedControllers = new HelixManager[n];
+ for (int i = 0; i < n; i++) {
+ int port = 12918 + i;
+ distributedControllers[i] =
+ new ZKHelixManager(clusterName, "localhost_" + port, InstanceType.CONTROLLER_PARTICIPANT,
+ _zkaddr);
+ distributedControllers[i].getStateMachineEngine().registerStateModelFactory(StateModelDefId.MasterSlave,
+ new MockMSModelFactory());
+ distributedControllers[i].connect();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(_zkaddr,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // disconnect first distributed-controller, and verify second takes leadership
+ distributedControllers[0].disconnect();
+
+ // verify leader changes to localhost_12919
+ Thread.sleep(100);
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(_zkaddr,
+ clusterName));
+ Assert.assertTrue(result);
+
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, _baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance("localhost_12918")));
+ LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+ Assert.assertNotNull(leader);
+ Assert.assertEquals(leader.getId(), "localhost_12919");
+
+ // clean up
+ distributedControllers[1].disconnect();
+ Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance("localhost_12919")));
+ Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ /**
+ * expire a controller and make sure the other takes the leadership
+ * @param expireController
+ * @param newController
+ * @throws Exception
+ */
+ void expireController(MockMultiClusterController expireController,
+ MockMultiClusterController newController) throws Exception {
+ String clusterName = expireController.getClusterName();
+ LOG.info("Expiring distributedController: " + expireController.getInstanceName()
+ + ", session: " + expireController.getSessionId() + " ...");
+ String oldSessionId = expireController.getSessionId();
+
+ ZkTestHelper.expireSession(expireController.getZkClient());
+ String newSessionId = expireController.getSessionId();
+ LOG.debug("Expried distributedController: " + expireController.getInstanceName()
+ + ", oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
+
+ boolean result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+ _zkaddr, clusterName));
+ Assert.assertTrue(result);
+
+ // verify leader changes to localhost_12919
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, _baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ Assert.assertNotNull(accessor.getProperty(keyBuilder.liveInstance(expireController
+ .getInstanceName())));
+ LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+ Assert.assertNotNull(leader);
+ Assert.assertEquals(leader.getId(), newController.getInstanceName());
+
+ // check expired-controller has 2 handlers: message and leader-election
+ TestHelper.printHandlers(expireController, expireController.getHandlers());
+
+ List<ZkCallbackHandler> handlers = expireController.getHandlers();
+ Assert.assertEquals(handlers.size(), 2,
+ "Distributed controller should have 2 handler (message and leader-election) after lose leadership, but was "
+ + handlers.size());
+ }
+
+ @Test
+ public void simpleSessionExpiryTest() throws Exception {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 2;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 4, // partitions per resource
+ n, // number of nodes
+ 2, // replicas
+ "MasterSlave", true); // do rebalance
+
+ MockMultiClusterController[] distributedControllers = new MockMultiClusterController[n];
+
+ for (int i = 0; i < n; i++) {
+ String contrllerName = "localhost_" + (12918 + i);
+ distributedControllers[i] =
+ new MockMultiClusterController(_zkaddr, clusterName, contrllerName);
+ distributedControllers[i].getStateMachineEngine().registerStateModelFactory(StateModelDefId.MasterSlave,
+ new MockMSModelFactory());
+ distributedControllers[i].connect();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(_zkaddr,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // expire localhost_12918
+ expireController(distributedControllers[0], distributedControllers[1]);
+
+ // expire localhost_12919
+ expireController(distributedControllers[1], distributedControllers[0]);
+
+ // clean up
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ for (int i = 0; i < n; i++) {
+ distributedControllers[i].disconnect();
+ Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance(distributedControllers[i]
+ .getInstanceName())));
+ }
+ Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
index 309ab18..9ce0941 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
@@ -34,6 +34,7 @@ import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZkTestHelper;
import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.manager.zk.MockParticipant;
import org.apache.helix.manager.zk.MockController;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -74,7 +75,7 @@ public class TestParticipantManager extends ZkTestBase {
HelixManager participant =
new ZKHelixManager(clusterName, "localhost_12918", InstanceType.PARTICIPANT, _zkaddr);
- participant.getStateMachineEngine().registerStateModelFactory("MasterSlave",
+ participant.getStateMachineEngine().registerStateModelFactory(StateModelDefId.MasterSlave,
new MockMSModelFactory());
participant.connect();
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
index d6d7bab..eaf74bb 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
@@ -28,13 +28,15 @@ import java.util.TreeMap;
import org.apache.helix.HelixAdmin;
import org.apache.helix.TestHelper;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.manager.zk.MockParticipant;
import org.apache.helix.manager.zk.MockController;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.mock.participant.ErrTransition;
import org.apache.helix.participant.HelixStateMachineEngine;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.testutil.ZkTestBase;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -72,8 +74,7 @@ public class TestStateModelLeak extends ZkTestBase {
"MasterSlave", true); // do rebalance
// start controller
- MockController controller =
- new MockController(_zkaddr, clusterName, "controller");
+ MockController controller = new MockController(_zkaddr, clusterName, "controller");
controller.syncStart();
MockParticipant[] participants = new MockParticipant[n];
@@ -92,15 +93,16 @@ public class TestStateModelLeak extends ZkTestBase {
// check state-models in state-machine
HelixStateMachineEngine stateMachine =
(HelixStateMachineEngine) participants[0].getStateMachineEngine();
- StateModelFactory<? extends StateModel> fty = stateMachine.getStateModelFactory("MasterSlave");
- Map<String, String> expectStateModelMap = new TreeMap<String, String>();
- expectStateModelMap.put("TestDB0_0", "SLAVE");
- expectStateModelMap.put("TestDB0_1", "MASTER");
- expectStateModelMap.put("TestDB0_2", "SLAVE");
- expectStateModelMap.put("TestDB0_3", "MASTER");
+ StateTransitionHandlerFactory<? extends TransitionHandler> fty =
+ stateMachine.getStateModelFactory(StateModelDefId.from("MasterSlave"));
+ Map<PartitionId, String> expectStateModelMap = new TreeMap<PartitionId, String>();
+ expectStateModelMap.put(PartitionId.from("TestDB0_0"), "SLAVE");
+ expectStateModelMap.put(PartitionId.from("TestDB0_1"), "MASTER");
+ expectStateModelMap.put(PartitionId.from("TestDB0_2"), "SLAVE");
+ expectStateModelMap.put(PartitionId.from("TestDB0_3"), "MASTER");
checkStateModelMap(fty, expectStateModelMap);
- // drop resource
+ // drop resourcePartitionId
HelixAdmin admin = new ZKHelixAdmin(_zkclient);
admin.dropResource(clusterName, "TestDB0");
@@ -146,8 +148,7 @@ public class TestStateModelLeak extends ZkTestBase {
"MasterSlave", true); // do rebalance
// start controller
- MockController controller =
- new MockController(_zkaddr, clusterName, "controller");
+ MockController controller = new MockController(_zkaddr, clusterName, "controller");
controller.syncStart();
MockParticipant[] participants = new MockParticipant[n];
@@ -177,12 +178,13 @@ public class TestStateModelLeak extends ZkTestBase {
// check state-models in state-machine
HelixStateMachineEngine stateMachine =
(HelixStateMachineEngine) participants[0].getStateMachineEngine();
- StateModelFactory<? extends StateModel> fty = stateMachine.getStateModelFactory("MasterSlave");
- Map<String, String> expectStateModelMap = new TreeMap<String, String>();
- expectStateModelMap.put("TestDB0_0", "ERROR");
- expectStateModelMap.put("TestDB0_1", "MASTER");
- expectStateModelMap.put("TestDB0_2", "SLAVE");
- expectStateModelMap.put("TestDB0_3", "MASTER");
+ StateTransitionHandlerFactory<? extends TransitionHandler> fty =
+ stateMachine.getStateModelFactory(StateModelDefId.from("MasterSlave"));
+ Map<PartitionId, String> expectStateModelMap = new TreeMap<PartitionId, String>();
+ expectStateModelMap.put(PartitionId.from("TestDB0_0"), "ERROR");
+ expectStateModelMap.put(PartitionId.from("TestDB0_1"), "MASTER");
+ expectStateModelMap.put(PartitionId.from("TestDB0_2"), "SLAVE");
+ expectStateModelMap.put(PartitionId.from("TestDB0_3"), "MASTER");
checkStateModelMap(fty, expectStateModelMap);
// drop resource
@@ -212,11 +214,11 @@ public class TestStateModelLeak extends ZkTestBase {
* @param fty
* @param expectStateModelMap
*/
- static void checkStateModelMap(StateModelFactory<? extends StateModel> fty,
- Map<String, String> expectStateModelMap) {
+ static void checkStateModelMap(StateTransitionHandlerFactory<? extends TransitionHandler> fty,
+ Map<PartitionId, String> expectStateModelMap) {
Assert.assertEquals(fty.getPartitionSet().size(), expectStateModelMap.size());
- for (String partition : fty.getPartitionSet()) {
- StateModel stateModel = fty.getStateModel(partition);
+ for (PartitionId partition : fty.getPartitionSet()) {
+ TransitionHandler stateModel = fty.getTransitionHandler(partition);
String actualState = stateModel.getCurrentState();
String expectState = expectStateModelMap.get(partition);
LOG.debug(partition + " actual state: " + actualState + ", expect state: " + expectState);
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java
deleted file mode 100644
index 280786c..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.apache.helix.integration.manager;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.List;
-
-import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ZkClient;
-
-public interface ZkTestManager {
- ZkClient getZkClient();
-
- List<CallbackHandler> getHandlers();
-
- String getInstanceName();
-
- String getClusterName();
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/manager/zk/MockMultiClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/MockMultiClusterController.java b/helix-core/src/test/java/org/apache/helix/manager/zk/MockMultiClusterController.java
index 7f8b1a3..a39bd84 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/MockMultiClusterController.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/MockMultiClusterController.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.helix.InstanceType;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.participant.DistClusterControllerStateModelFactory;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.log4j.Logger;
@@ -64,7 +65,7 @@ public class MockMultiClusterController extends ZKHelixManager implements Runnab
StateMachineEngine stateMach = getStateMachineEngine();
DistClusterControllerStateModelFactory lsModelFactory =
new DistClusterControllerStateModelFactory(_zkAddress);
- stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
+ stateMach.registerStateModelFactory(StateModelDefId.LeaderStandby, lsModelFactory);
connect();
_startCountDown.countDown();
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/manager/zk/MockParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/MockParticipant.java b/helix-core/src/test/java/org/apache/helix/manager/zk/MockParticipant.java
index f107d3d..3936680 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/MockParticipant.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/MockParticipant.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
import org.apache.helix.HelixConnection;
import org.apache.helix.InstanceType;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
import org.apache.helix.mock.participant.MockMSModelFactory;
@@ -72,17 +73,17 @@ public class MockParticipant extends ZKHelixManager implements Runnable {
public void run() {
try {
StateMachineEngine stateMach = getStateMachineEngine();
- stateMach.registerStateModelFactory("MasterSlave", _msModelFactory);
+ stateMach.registerStateModelFactory(StateModelDefId.MasterSlave, _msModelFactory);
DummyLeaderStandbyStateModelFactory lsModelFactory =
new DummyLeaderStandbyStateModelFactory(10);
DummyOnlineOfflineStateModelFactory ofModelFactory =
new DummyOnlineOfflineStateModelFactory(10);
- stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
- stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory);
+ stateMach.registerStateModelFactory(StateModelDefId.LeaderStandby, lsModelFactory);
+ stateMach.registerStateModelFactory(StateModelDefId.OnlineOffline, ofModelFactory);
MockSchemataModelFactory schemataFactory = new MockSchemataModelFactory();
- stateMach.registerStateModelFactory("STORAGE_DEFAULT_SM_SCHEMATA", schemataFactory);
+ stateMach.registerStateModelFactory(StateModelDefId.from("STORAGE_DEFAULT_SM_SCHEMATA"), schemataFactory);
connect();
_startCountDown.countDown();
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java
index 8b5b30c..aba12b4 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java
@@ -20,14 +20,11 @@ package org.apache.helix.manager.zk;
*/
import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.helix.HelixException;
import org.apache.helix.NotificationContext;
import org.apache.helix.api.id.MessageId;
import org.apache.helix.manager.zk.DefaultControllerMessageHandlerFactory.DefaultControllerMessageHandler;
-import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.log4j.Logger;
@@ -49,7 +46,7 @@ public class TestDefaultControllerMsgHandlerFactory {
boolean exceptionCaught = false;
try {
- MessageHandler handler = facotry.createHandler(message, context);
+ facotry.createHandler(message, context);
} catch (HelixException e) {
exceptionCaught = true;
}
@@ -58,13 +55,12 @@ public class TestDefaultControllerMsgHandlerFactory {
message = new Message(MessageType.CONTROLLER_MSG, MessageId.from("1"));
exceptionCaught = false;
try {
- MessageHandler handler = facotry.createHandler(message, context);
+ facotry.createHandler(message, context);
} catch (HelixException e) {
exceptionCaught = true;
}
AssertJUnit.assertFalse(exceptionCaught);
- Map<String, String> resultMap = new HashMap<String, String>();
message = new Message(MessageType.NO_OP, MessageId.from("3"));
DefaultControllerMessageHandler defaultHandler =
new DefaultControllerMessageHandler(message, context);
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallbackSvc.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallbackSvc.java b/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallbackSvc.java
index da686fe..f0b3507 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallbackSvc.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallbackSvc.java
@@ -20,10 +20,8 @@ package org.apache.helix.messaging;
*/
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import org.apache.helix.HelixException;
@@ -62,9 +60,7 @@ public class TestAsyncCallbackSvc {
}
- @Test(groups = {
- "unitTest"
- })
+ @Test()
public void testAsyncCallbackSvc() throws Exception {
AsyncCallbackService svc = new AsyncCallbackService();
HelixManager manager = new MockHelixManager();
@@ -73,14 +69,14 @@ public class TestAsyncCallbackSvc {
Message msg = new Message(svc.getMessageType(), MessageId.from(UUID.randomUUID().toString()));
msg.setTgtSessionId(SessionId.from(manager.getSessionId()));
try {
- MessageHandler aHandler = svc.createHandler(msg, changeContext);
+ svc.createHandler(msg, changeContext);
} catch (HelixException e) {
AssertJUnit.assertTrue(e.getMessage().indexOf(msg.getMessageId().stringify()) != -1);
}
Message msg2 = new Message("RandomType", MessageId.from(UUID.randomUUID().toString()));
msg2.setTgtSessionId(SessionId.from(manager.getSessionId()));
try {
- MessageHandler aHandler = svc.createHandler(msg2, changeContext);
+ svc.createHandler(msg2, changeContext);
} catch (HelixException e) {
AssertJUnit.assertTrue(e.getMessage().indexOf(msg2.getMessageId().stringify()) != -1);
}
@@ -88,7 +84,7 @@ public class TestAsyncCallbackSvc {
msg3.setTgtSessionId(SessionId.from(manager.getSessionId()));
msg3.setCorrelationId("wfwegw");
try {
- MessageHandler aHandler = svc.createHandler(msg3, changeContext);
+ svc.createHandler(msg3, changeContext);
} catch (HelixException e) {
AssertJUnit.assertTrue(e.getMessage().indexOf(msg3.getMessageId().stringify()) != -1);
}
@@ -107,7 +103,6 @@ public class TestAsyncCallbackSvc {
msg.setCorrelationId(corrId);
MessageHandler aHandler = svc.createHandler(msg, changeContext);
- Map<String, String> resultMap = new HashMap<String, String>();
aHandler.handleMessage();
AssertJUnit.assertTrue(callback.isDone());
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java b/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
index 9880605..4b55935 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
@@ -32,11 +32,12 @@ import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.model.Message;
import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.log4j.Logger;
public class DummyProcess {
@@ -49,13 +50,11 @@ public class DummyProcess {
public static final String help = "help";
public static final String transDelay = "transDelay";
public static final String helixManagerType = "helixManagerType";
- // public static final String rootNamespace = "rootNamespace";
private final String _zkConnectString;
private final String _clusterName;
private final String _instanceName;
private DummyStateModelFactory stateModelFactory;
- // private StateMachineEngine genericStateMachineHandler;
private int _transDelayInMs = 0;
private final String _clusterMangerType;
@@ -96,19 +95,16 @@ public class DummyProcess {
new DummyLeaderStandbyStateModelFactory(_transDelayInMs);
DummyOnlineOfflineStateModelFactory stateModelFactory2 =
new DummyOnlineOfflineStateModelFactory(_transDelayInMs);
- // genericStateMachineHandler = new StateMachineEngine();
StateMachineEngine stateMach = manager.getStateMachineEngine();
- stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
- stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory1);
- stateMach.registerStateModelFactory("OnlineOffline", stateModelFactory2);
+ stateMach.registerStateModelFactory(StateModelDefId.MasterSlave, stateModelFactory);
+ stateMach.registerStateModelFactory(StateModelDefId.LeaderStandby, stateModelFactory1);
+ stateMach.registerStateModelFactory(StateModelDefId.OnlineOffline, stateModelFactory2);
manager.connect();
- // manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
- // genericStateMachineHandler);
return manager;
}
- public static class DummyStateModelFactory extends StateModelFactory<DummyStateModel> {
+ public static class DummyStateModelFactory extends StateTransitionHandlerFactory<DummyStateModel> {
int _delay;
public DummyStateModelFactory(int delay) {
@@ -116,7 +112,7 @@ public class DummyProcess {
}
@Override
- public DummyStateModel createNewStateModel(String stateUnitKey) {
+ public DummyStateModel createStateTransitionHandler(PartitionId partition) {
DummyStateModel model = new DummyStateModel();
model.setDelay(_delay);
return model;
@@ -124,7 +120,7 @@ public class DummyProcess {
}
public static class DummyLeaderStandbyStateModelFactory extends
- StateModelFactory<DummyLeaderStandbyStateModel> {
+ StateTransitionHandlerFactory<DummyLeaderStandbyStateModel> {
int _delay;
public DummyLeaderStandbyStateModelFactory(int delay) {
@@ -132,7 +128,7 @@ public class DummyProcess {
}
@Override
- public DummyLeaderStandbyStateModel createNewStateModel(String stateUnitKey) {
+ public DummyLeaderStandbyStateModel createStateTransitionHandler(PartitionId partition) {
DummyLeaderStandbyStateModel model = new DummyLeaderStandbyStateModel();
model.setDelay(_delay);
return model;
@@ -140,7 +136,7 @@ public class DummyProcess {
}
public static class DummyOnlineOfflineStateModelFactory extends
- StateModelFactory<DummyOnlineOfflineStateModel> {
+ StateTransitionHandlerFactory<DummyOnlineOfflineStateModel> {
int _delay;
public DummyOnlineOfflineStateModelFactory(int delay) {
@@ -148,14 +144,14 @@ public class DummyProcess {
}
@Override
- public DummyOnlineOfflineStateModel createNewStateModel(String stateUnitKey) {
+ public DummyOnlineOfflineStateModel createStateTransitionHandler(PartitionId partition) {
DummyOnlineOfflineStateModel model = new DummyOnlineOfflineStateModel();
model.setDelay(_delay);
return model;
}
}
- public static class DummyStateModel extends StateModel {
+ public static class DummyStateModel extends TransitionHandler {
int _transDelay = 0;
public void setDelay(int delay) {
@@ -200,7 +196,7 @@ public class DummyProcess {
}
}
- public static class DummyOnlineOfflineStateModel extends StateModel {
+ public static class DummyOnlineOfflineStateModel extends TransitionHandler {
int _transDelay = 0;
public void setDelay(int delay) {
@@ -231,7 +227,7 @@ public class DummyProcess {
}
}
- public static class DummyLeaderStandbyStateModel extends StateModel {
+ public static class DummyLeaderStandbyStateModel extends TransitionHandler {
int _transDelay = 0;
public void setDelay(int delay) {
@@ -353,7 +349,6 @@ public class DummyProcess {
public static CommandLine processCommandLineArgs(String[] cliArgs) throws Exception {
CommandLineParser cliParser = new GnuParser();
Options cliOptions = constructCommandLineOptions();
- // CommandLine cmd = null;
try {
return cliParser.parse(cliOptions, cliArgs);
@@ -371,8 +366,6 @@ public class DummyProcess {
String zkConnectString = "localhost:2181";
String clusterName = "testCluster";
String instanceName = "localhost_8900";
- String cvFileStr = null;
- // String rootNs = null;
int delay = 0;
if (args.length > 0) {
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/mock/participant/MockBootstrapModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockBootstrapModelFactory.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockBootstrapModelFactory.java
index 177e7c4..0e01b60 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockBootstrapModelFactory.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockBootstrapModelFactory.java
@@ -1,5 +1,8 @@
package org.apache.helix.mock.participant;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.id.PartitionId;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,12 +22,11 @@ package org.apache.helix.mock.participant;
* under the License.
*/
-import org.apache.helix.participant.statemachine.StateModelFactory;
// mock Bootstrap state model factory
-public class MockBootstrapModelFactory extends StateModelFactory<MockBootstrapStateModel> {
+public class MockBootstrapModelFactory extends StateTransitionHandlerFactory<MockBootstrapStateModel> {
@Override
- public MockBootstrapStateModel createNewStateModel(String partitionKey) {
+ public MockBootstrapStateModel createStateTransitionHandler(PartitionId partitionKey) {
MockBootstrapStateModel model = new MockBootstrapStateModel();
return model;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/mock/participant/MockBootstrapStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockBootstrapStateModel.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockBootstrapStateModel.java
index 79367db..ceb3442 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockBootstrapStateModel.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockBootstrapStateModel.java
@@ -20,8 +20,8 @@ package org.apache.helix.mock.participant;
*/
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.log4j.Logger;
@@ -30,7 +30,7 @@ import org.apache.log4j.Logger;
@StateModelInfo(initialState = "OFFLINE", states = {
"ONLINE", "BOOTSTRAP", "OFFLINE", "IDLE"
})
-public class MockBootstrapStateModel extends StateModel {
+public class MockBootstrapStateModel extends TransitionHandler {
private static Logger LOG = Logger.getLogger(MockBootstrapStateModel.class);
// Overwrite the default value of intial state
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
index 9325934..853b157 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
@@ -1,5 +1,8 @@
package org.apache.helix.mock.participant;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.id.PartitionId;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,10 +22,9 @@ package org.apache.helix.mock.participant;
* under the License.
*/
-import org.apache.helix.participant.statemachine.StateModelFactory;
// mock master slave state model factory
-public class MockMSModelFactory extends StateModelFactory<MockMSStateModel> {
+public class MockMSModelFactory extends StateTransitionHandlerFactory<MockMSStateModel> {
private MockTransition _transition;
public MockMSModelFactory() {
@@ -37,14 +39,14 @@ public class MockMSModelFactory extends StateModelFactory<MockMSStateModel> {
_transition = transition;
// set existing transition
- for (String partition : getPartitionSet()) {
- MockMSStateModel stateModel = getStateModel(partition);
+ for (PartitionId partition : getPartitionSet()) {
+ MockMSStateModel stateModel = getTransitionHandler(partition);
stateModel.setTransition(transition);
}
}
@Override
- public MockMSStateModel createNewStateModel(String partitionKey) {
+ public MockMSStateModel createStateTransitionHandler(PartitionId partitionKey) {
MockMSStateModel model = new MockMSStateModel(_transition);
return model;
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java
index 78d9832..18652cd 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java
@@ -20,8 +20,8 @@ package org.apache.helix.mock.participant;
*/
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.log4j.Logger;
@@ -30,7 +30,7 @@ import org.apache.log4j.Logger;
@StateModelInfo(initialState = "OFFLINE", states = {
"MASTER", "SLAVE", "ERROR"
})
-public class MockMSStateModel extends StateModel {
+public class MockMSStateModel extends TransitionHandler {
private static Logger LOG = Logger.getLogger(MockMSStateModel.class);
protected MockTransition _transition;
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/mock/participant/MockSchemataModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockSchemataModelFactory.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockSchemataModelFactory.java
index 525e764..7cc43ef 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockSchemataModelFactory.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockSchemataModelFactory.java
@@ -19,12 +19,15 @@ package org.apache.helix.mock.participant;
* under the License.
*/
-import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.id.PartitionId;
-// mock STORAGE_DEFAULT_SM_SCHEMATA state model factory
-public class MockSchemataModelFactory extends StateModelFactory<MockSchemataStateModel> {
+/**
+ * Mock STORAGE_DEFAULT_SM_SCHEMATA state model factory
+ */
+public class MockSchemataModelFactory extends StateTransitionHandlerFactory<MockSchemataStateModel> {
@Override
- public MockSchemataStateModel createNewStateModel(String partitionKey) {
+ public MockSchemataStateModel createStateTransitionHandler(PartitionId partitionKey) {
MockSchemataStateModel model = new MockSchemataStateModel();
return model;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/mock/participant/MockSchemataStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockSchemataStateModel.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockSchemataStateModel.java
index c3c1fa5..e3c9844 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockSchemataStateModel.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockSchemataStateModel.java
@@ -20,8 +20,8 @@ package org.apache.helix.mock.participant;
*/
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.log4j.Logger;
@@ -30,7 +30,7 @@ import org.apache.log4j.Logger;
@StateModelInfo(initialState = "OFFLINE", states = {
"MASTER", "DROPPED", "ERROR"
})
-public class MockSchemataStateModel extends StateModel {
+public class MockSchemataStateModel extends TransitionHandler {
private static Logger LOG = Logger.getLogger(MockSchemataStateModel.class);
@Transition(to = "MASTER", from = "OFFLINE")
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java b/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java
index 6f78427..6a3ccd9 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java
@@ -107,7 +107,7 @@ public class TestConstraint extends ZkTestBase {
ConstraintItem constraint5 = new ConstraintItem(record.getMapField("constraint5"));
ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkclient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkclient));
Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()),
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index efa30da..3efbffb 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -58,7 +58,7 @@ public class MockZKHelixManager implements HelixManager {
_instanceName = instanceName;
_clusterName = clusterName;
_type = type;
- _accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
+ _accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModelFactory.java b/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModelFactory.java
index 26d65f0..7055500 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModelFactory.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModelFactory.java
@@ -19,6 +19,7 @@ package org.apache.helix.participant;
* under the License.
*/
+import org.apache.helix.api.id.PartitionId;
import org.testng.annotations.Test;
public class TestDistControllerStateModelFactory {
@@ -27,7 +28,7 @@ public class TestDistControllerStateModelFactory {
public void testDistControllerStateModelFactory() {
DistClusterControllerStateModelFactory factory =
new DistClusterControllerStateModelFactory("localhost:2181");
- DistClusterControllerStateModel stateModel = factory.createNewStateModel("key");
+ DistClusterControllerStateModel stateModel = factory.createStateTransitionHandler(PartitionId.from("key"));
stateModel.onBecomeStandbyFromOffline(null, null);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/participant/statemachine/TestStateModelParser.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/participant/statemachine/TestStateModelParser.java b/helix-core/src/test/java/org/apache/helix/participant/statemachine/TestStateModelParser.java
index 7c128fb..d6968f5 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/statemachine/TestStateModelParser.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/statemachine/TestStateModelParser.java
@@ -22,6 +22,7 @@ package org.apache.helix.participant.statemachine;
import java.lang.reflect.Method;
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.model.Message;
import org.apache.log4j.Logger;
import org.testng.Assert;
@@ -34,7 +35,7 @@ public class TestStateModelParser {
@StateModelInfo(initialState = "OFFLINE", states = {
"MASTER", "SLAVE", "ERROR"
})
- class StateModelUsingAnnotation extends StateModel {
+ class StateModelUsingAnnotation extends TransitionHandler {
@Transition(to = "SLAVE", from = "OFFLINE")
public void onBecomeSlaveFromOffline(Message message, NotificationContext context) {
LOG.info("Become SLAVE from OFFLINE");
@@ -58,7 +59,7 @@ public class TestStateModelParser {
}
}
- class StateModelUsingNameConvention extends StateModel {
+ class StateModelUsingNameConvention extends TransitionHandler {
// empty state model
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-examples/src/main/java/org/apache/helix/examples/BootstrapHandler.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/BootstrapHandler.java b/helix-examples/src/main/java/org/apache/helix/examples/BootstrapHandler.java
index f0922f3..d4b2817 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/BootstrapHandler.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/BootstrapHandler.java
@@ -26,30 +26,27 @@ import org.apache.helix.Criteria;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.api.id.MessageId;
+import org.apache.helix.api.id.PartitionId;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageState;
import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
-public class BootstrapHandler extends StateModelFactory<StateModel> {
+public class BootstrapHandler extends StateTransitionHandlerFactory<TransitionHandler> {
@Override
- public StateModel createNewStateModel(String stateUnitKey) {
+ public TransitionHandler createStateTransitionHandler(PartitionId stateUnitKey) {
return new BootstrapStateModel(stateUnitKey);
}
@StateModelInfo(initialState = "OFFLINE", states = "{'OFFLINE','SLAVE','MASTER'}")
- public static class BootstrapStateModel extends StateModel {
-
- private final String _stateUnitKey;
-
- public BootstrapStateModel(String stateUnitKey) {
- _stateUnitKey = stateUnitKey;
+ public static class BootstrapStateModel extends TransitionHandler {
+ public BootstrapStateModel(PartitionId stateUnitKey) {
}
@Transition(from = "MASTER", to = "SLAVE")
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-examples/src/main/java/org/apache/helix/examples/BootstrapProcess.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/BootstrapProcess.java b/helix-examples/src/main/java/org/apache/helix/examples/BootstrapProcess.java
index 2506c01..337c484 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/BootstrapProcess.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/BootstrapProcess.java
@@ -36,6 +36,9 @@ import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.messaging.AsyncCallback;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
@@ -43,8 +46,6 @@ import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
/**
* This process does little more than handling the state transition messages.
@@ -77,21 +78,15 @@ public class BootstrapProcess {
private final String zkConnectString;
private final String clusterName;
private final String instanceName;
- private final String stateModelType;
private HelixManager manager;
- // private StateMachineEngine genericStateMachineHandler;
-
- private StateModelFactory<StateModel> stateModelFactory;
- private final int delay;
+ private StateTransitionHandlerFactory<TransitionHandler> stateModelFactory;
public BootstrapProcess(String zkConnectString, String clusterName, String instanceName,
String stateModel, int delay) {
this.zkConnectString = zkConnectString;
this.clusterName = clusterName;
this.instanceName = instanceName;
- stateModelType = stateModel;
- this.delay = delay;
}
public void start() throws Exception {
@@ -100,11 +95,9 @@ public class BootstrapProcess {
zkConnectString);
stateModelFactory = new BootstrapHandler();
- // genericStateMachineHandler = new StateMachineEngine();
- // genericStateMachineHandler.registerStateModelFactory("MasterSlave", stateModelFactory);
StateMachineEngine stateMach = manager.getStateMachineEngine();
- stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+ stateMach.registerStateModelFactory(StateModelDefId.MasterSlave, stateModelFactory);
manager.getMessagingService().registerMessageHandlerFactory(
MessageType.STATE_TRANSITION.toString(), stateMach);
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-examples/src/main/java/org/apache/helix/examples/DummyParticipant.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/DummyParticipant.java b/helix-examples/src/main/java/org/apache/helix/examples/DummyParticipant.java
index c6ab3a4..7dcdd8a 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/DummyParticipant.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/DummyParticipant.java
@@ -23,11 +23,12 @@ import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.model.Message;
import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
@@ -36,7 +37,7 @@ public class DummyParticipant {
@StateModelInfo(initialState = "OFFLINE", states = {
"MASTER", "SLAVE", "ERROR"
})
- public static class DummyMSStateModel extends StateModel {
+ public static class DummyMSStateModel extends TransitionHandler {
@Transition(to = "SLAVE", from = "OFFLINE")
public void onBecomeSlaveFromOffline(Message message, NotificationContext context) {
PartitionId partitionId = message.getPartitionId();
@@ -86,9 +87,9 @@ public class DummyParticipant {
}
// dummy master slave state model factory
- public static class DummyMSModelFactory extends StateModelFactory<DummyMSStateModel> {
+ public static class DummyMSModelFactory extends StateTransitionHandlerFactory<DummyMSStateModel> {
@Override
- public DummyMSStateModel createNewStateModel(String partitionName) {
+ public DummyMSStateModel createStateTransitionHandler(PartitionId partitionName) {
DummyMSStateModel model = new DummyMSStateModel();
return model;
}
@@ -112,7 +113,7 @@ public class DummyParticipant {
StateMachineEngine stateMach = manager.getStateMachineEngine();
DummyMSModelFactory msModelFactory = new DummyMSModelFactory();
- stateMach.registerStateModelFactory("MasterSlave", msModelFactory);
+ stateMach.registerStateModelFactory(StateModelDefId.MasterSlave, msModelFactory);
manager.connect();
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-examples/src/main/java/org/apache/helix/examples/ExampleProcess.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/ExampleProcess.java b/helix-examples/src/main/java/org/apache/helix/examples/ExampleProcess.java
index 840a963..ac19a27 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/ExampleProcess.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/ExampleProcess.java
@@ -33,11 +33,12 @@ import org.apache.commons.cli.ParseException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.manager.zk.HelixManagerShutdownHook;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.log4j.Logger;
public class ExampleProcess {
@@ -59,7 +60,7 @@ public class ExampleProcess {
private final String stateModelType;
private HelixManager manager;
- private StateModelFactory<StateModel> stateModelFactory;
+ private StateTransitionHandlerFactory<TransitionHandler> stateModelFactory;
private final int delay;
public ExampleProcess(String zkConnectString, String clusterName, String instanceName,
@@ -83,12 +84,9 @@ public class ExampleProcess {
} else if ("LeaderStandby".equalsIgnoreCase(stateModelType)) {
stateModelFactory = new LeaderStandbyStateModelFactory(delay);
}
- // genericStateMachineHandler = new StateMachineEngine();
- // genericStateMachineHandler.registerStateModelFactory(stateModelType,
- // stateModelFactory);
StateMachineEngine stateMach = manager.getStateMachineEngine();
- stateMach.registerStateModelFactory(stateModelType, stateModelFactory);
+ stateMach.registerStateModelFactory(StateModelDefId.from(stateModelType), stateModelFactory);
manager.connect();
manager.getMessagingService().registerMessageHandlerFactory(
MessageType.STATE_TRANSITION.toString(), stateMach);
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-examples/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java b/helix-examples/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java
index 43ac5de..74027d4 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/LeaderStandbyStateModelFactory.java
@@ -20,11 +20,12 @@ package org.apache.helix.examples;
*/
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-public class LeaderStandbyStateModelFactory extends StateModelFactory<StateModel> {
+public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactory<TransitionHandler> {
int _delay;
public LeaderStandbyStateModelFactory(int delay) {
@@ -32,13 +33,13 @@ public class LeaderStandbyStateModelFactory extends StateModelFactory<StateModel
}
@Override
- public StateModel createNewStateModel(String stateUnitKey) {
+ public TransitionHandler createStateTransitionHandler(PartitionId stateUnitKey) {
LeaderStandbyStateModel stateModel = new LeaderStandbyStateModel();
stateModel.setDelay(_delay);
return stateModel;
}
- public static class LeaderStandbyStateModel extends StateModel {
+ public static class LeaderStandbyStateModel extends TransitionHandler {
int _transDelay = 0;
public void setDelay(int delay) {
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java b/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
index 6075d22..2ac5d37 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
@@ -10,6 +10,8 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.Scope;
import org.apache.helix.api.State;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.api.accessor.ClusterAccessor;
import org.apache.helix.api.config.ClusterConfig;
import org.apache.helix.api.config.ParticipantConfig;
@@ -27,8 +29,6 @@ import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.Transition;
import org.apache.helix.model.builder.AutoRebalanceModeISBuilder;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.log4j.Logger;
@@ -240,7 +240,7 @@ public class LogicalModelExample {
* Dummy state model that just prints state transitions for the lock-unlock model
*/
@StateModelInfo(initialState = "OFFLINE", states = { "LOCKED", "RELEASED", "DROPPED", "ERROR" })
- public static class LockUnlockStateModel extends StateModel {
+ public static class LockUnlockStateModel extends TransitionHandler {
private final PartitionId _partitionId;
/**
@@ -272,9 +272,9 @@ public class LogicalModelExample {
/**
* State model factory for lock-unlock
*/
- public static class LockUnlockFactory extends HelixStateModelFactory<LockUnlockStateModel> {
+ public static class LockUnlockFactory extends StateTransitionHandlerFactory<LockUnlockStateModel> {
@Override
- public LockUnlockStateModel createNewStateModel(PartitionId partitionId) {
+ public LockUnlockStateModel createStateTransitionHandler(PartitionId partitionId) {
return new LockUnlockStateModel(partitionId);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-examples/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java b/helix-examples/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java
index 71d1412..2befd61 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/MasterSlaveStateModelFactory.java
@@ -20,11 +20,12 @@ package org.apache.helix.examples;
*/
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-public class MasterSlaveStateModelFactory extends StateModelFactory<StateModel> {
+public class MasterSlaveStateModelFactory extends StateTransitionHandlerFactory<TransitionHandler> {
int _delay;
String _instanceName = "";
@@ -43,15 +44,15 @@ public class MasterSlaveStateModelFactory extends StateModelFactory<StateModel>
}
@Override
- public StateModel createNewStateModel(String partitionName) {
+ public TransitionHandler createStateTransitionHandler(PartitionId partitionName) {
MasterSlaveStateModel stateModel = new MasterSlaveStateModel();
stateModel.setInstanceName(_instanceName);
stateModel.setDelay(_delay);
- stateModel.setPartitionName(partitionName);
+ stateModel.setPartitionName(partitionName.stringify());
return stateModel;
}
- public static class MasterSlaveStateModel extends StateModel {
+ public static class MasterSlaveStateModel extends TransitionHandler {
int _transDelay = 0;
String partitionName;
String _instanceName = "";
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-examples/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java b/helix-examples/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java
index daf03a9..f6fb2fa 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/OnlineOfflineStateModelFactory.java
@@ -20,11 +20,12 @@ package org.apache.helix.examples;
*/
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-public class OnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
+public class OnlineOfflineStateModelFactory extends StateTransitionHandlerFactory<TransitionHandler> {
int _delay;
public OnlineOfflineStateModelFactory(int delay) {
@@ -32,13 +33,13 @@ public class OnlineOfflineStateModelFactory extends StateModelFactory<StateModel
}
@Override
- public StateModel createNewStateModel(String stateUnitKey) {
+ public TransitionHandler createStateTransitionHandler(PartitionId stateUnitKey) {
OnlineOfflineStateModel stateModel = new OnlineOfflineStateModel();
stateModel.setDelay(_delay);
return stateModel;
}
- public static class OnlineOfflineStateModel extends StateModel {
+ public static class OnlineOfflineStateModel extends TransitionHandler {
int _transDelay = 0;
public void setDelay(int delay) {
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-examples/src/main/java/org/apache/helix/examples/Quickstart.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/Quickstart.java b/helix-examples/src/main/java/org/apache/helix/examples/Quickstart.java
index 2f3a677..d0d881e 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/Quickstart.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/Quickstart.java
@@ -252,7 +252,7 @@ public class Quickstart {
new MasterSlaveStateModelFactory(instanceName);
StateMachineEngine stateMach = manager.getStateMachineEngine();
- stateMach.registerStateModelFactory(STATE_MODEL_NAME.stringify(), stateModelFactory);
+ stateMach.registerStateModelFactory(STATE_MODEL_NAME, stateModelFactory);
manager.connect();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-provisioning/src/main/java/org/apache/helix/provisioning/participant/StatelessServiceStateModel.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/participant/StatelessServiceStateModel.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/participant/StatelessServiceStateModel.java
index f653de8..c179b6c 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/participant/StatelessServiceStateModel.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/participant/StatelessServiceStateModel.java
@@ -20,9 +20,9 @@ package org.apache.helix.provisioning.participant;
*/
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.log4j.Logger;
@@ -30,7 +30,7 @@ import org.apache.log4j.Logger;
@StateModelInfo(initialState = "OFFLINE", states = {
"OFFLINE", "ONLINE", "ERROR"
})
-public class StatelessServiceStateModel extends StateModel {
+public class StatelessServiceStateModel extends TransitionHandler {
private static final Logger LOG = Logger.getLogger(StatelessServiceStateModel.class);
private final StatelessParticipantService _service;
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-provisioning/src/main/java/org/apache/helix/provisioning/participant/StatelessServiceStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/participant/StatelessServiceStateModelFactory.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/participant/StatelessServiceStateModelFactory.java
index 19c1488..21f32ce 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/participant/StatelessServiceStateModelFactory.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/participant/StatelessServiceStateModelFactory.java
@@ -19,11 +19,11 @@ package org.apache.helix.provisioning.participant;
* under the License.
*/
+import org.apache.helix.api.StateTransitionHandlerFactory;
import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
public class StatelessServiceStateModelFactory extends
- HelixStateModelFactory<StatelessServiceStateModel> {
+ StateTransitionHandlerFactory<StatelessServiceStateModel> {
private final StatelessParticipantService _service;
@@ -32,7 +32,7 @@ public class StatelessServiceStateModelFactory extends
}
@Override
- public StatelessServiceStateModel createNewStateModel(PartitionId partitionId) {
+ public StatelessServiceStateModel createStateTransitionHandler(PartitionId partitionId) {
return new StatelessServiceStateModel(partitionId, _service);
}
[4/5] git commit: [HELIX-484] Remove
CallbackHandler/ZkCallbackHandler code duplication,
[HELIX-486] Remove StateModelFactory/HelixStateModelFactory code duplication,
rb=24332
Posted by zz...@apache.org.
[HELIX-484] Remove CallbackHandler/ZkCallbackHandler code duplication, [HELIX-486] Remove StateModelFactory/HelixStateModelFactory code duplication, rb=24332
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4ae1ff79
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4ae1ff79
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4ae1ff79
Branch: refs/heads/master
Commit: 4ae1ff796a524766b22f5fc184952187dc9cc24b
Parents: dd8226b
Author: zzhang <zz...@apache.org>
Authored: Thu Aug 7 13:45:50 2014 -0700
Committer: zzhang <zz...@apache.org>
Committed: Thu Aug 7 13:45:50 2014 -0700
----------------------------------------------------------------------
.../apache/helix/webapp/TestResetInstance.java | 9 +-
.../apache/helix/webapp/TestResetResource.java | 9 +-
.../org/apache/helix/agent/AgentStateModel.java | 4 +-
.../helix/agent/AgentStateModelFactory.java | 6 +-
.../api/StateTransitionHandlerFactory.java | 114 +++++
.../org/apache/helix/api/TransitionHandler.java | 106 +++++
.../apache/helix/api/id/StateModelDefId.java | 4 +
.../helix/controller/HelixControllerMain.java | 10 +-
.../helix/manager/zk/CallbackHandler.java | 420 -------------------
.../manager/zk/ParticipantManagerHelper.java | 281 -------------
.../helix/manager/zk/ZkHelixParticipant.java | 5 +-
.../handling/HelixStateTransitionHandler.java | 14 +-
.../helix/participant/CustomCodeInvoker.java | 9 +-
.../DistClusterControllerStateModel.java | 4 +-
.../DistClusterControllerStateModelFactory.java | 8 +-
.../participant/GenericLeaderStandbyModel.java | 7 +-
.../GenericLeaderStandbyStateModelFactory.java | 9 +-
.../participant/HelixCustomCodeRunner.java | 8 +-
.../participant/HelixStateMachineEngine.java | 121 ++----
.../helix/participant/StateMachineEngine.java | 38 +-
.../statemachine/HelixStateModelFactory.java | 118 ------
.../HelixStateModelFactoryAdaptor.java | 36 --
.../statemachine/ScheduledTaskStateModel.java | 34 +-
.../ScheduledTaskStateModelFactory.java | 10 +-
.../participant/statemachine/StateModel.java | 104 -----
.../statemachine/StateModelFactory.java | 116 -----
.../statemachine/StateModelParser.java | 11 +-
.../java/org/apache/helix/task/TaskRunner.java | 6 +-
.../org/apache/helix/task/TaskStateModel.java | 4 +-
.../helix/task/TaskStateModelFactory.java | 6 +-
.../org/apache/helix/tools/ClusterSetup.java | 2 +-
.../org/apache/helix/DummyProcessThread.java | 7 +-
.../src/test/java/org/apache/helix/Mocks.java | 6 +-
.../org/apache/helix/TestHelixTaskExecutor.java | 6 +-
.../org/apache/helix/TestHelixTaskHandler.java | 8 +-
.../test/java/org/apache/helix/TestHelper.java | 29 --
.../TestAddStateModelFactoryAfterConnect.java | 5 +-
.../integration/TestBatchMessageWrapper.java | 10 +-
.../TestCorrectnessOnConnectivityLoss.java | 10 +-
.../TestDistributedClusterController.java | 144 -------
.../helix/integration/TestErrorPartition.java | 14 +-
.../helix/integration/TestHelixConnection.java | 10 +-
.../helix/integration/TestMessageThrottle2.java | 17 +-
.../helix/integration/TestMessagingService.java | 35 +-
.../integration/TestMultiClusterController.java | 144 +++++++
.../integration/TestNonOfflineInitState.java | 3 +-
.../TestPartitionLevelTransitionConstraint.java | 16 +-
.../integration/TestPreferenceListAsQueue.java | 14 +-
.../helix/integration/TestResetInstance.java | 9 +-
.../integration/TestResetPartitionState.java | 9 +-
.../helix/integration/TestResetResource.java | 9 +-
.../helix/integration/TestSchedulerMessage.java | 1 -
.../integration/TestStateTransitionTimeout.java | 21 +-
.../helix/integration/TestZkReconnect.java | 15 +-
.../manager/TestConsecutiveZkSessionExpiry.java | 3 +-
.../TestDistributedControllerManager.java | 203 ---------
.../TestHelixMultiClusterController.java | 204 +++++++++
.../manager/TestParticipantManager.java | 3 +-
.../integration/manager/TestStateModelLeak.java | 48 ++-
.../integration/manager/ZkTestManager.java | 35 --
.../manager/zk/MockMultiClusterController.java | 3 +-
.../helix/manager/zk/MockParticipant.java | 9 +-
.../TestDefaultControllerMsgHandlerFactory.java | 8 +-
.../helix/messaging/TestAsyncCallbackSvc.java | 13 +-
.../helix/mock/participant/DummyProcess.java | 37 +-
.../participant/MockBootstrapModelFactory.java | 8 +-
.../participant/MockBootstrapStateModel.java | 4 +-
.../mock/participant/MockMSModelFactory.java | 12 +-
.../mock/participant/MockMSStateModel.java | 4 +-
.../participant/MockSchemataModelFactory.java | 11 +-
.../participant/MockSchemataStateModel.java | 4 +-
.../org/apache/helix/model/TestConstraint.java | 2 +-
.../helix/participant/MockZKHelixManager.java | 2 +-
.../TestDistControllerStateModelFactory.java | 3 +-
.../statemachine/TestStateModelParser.java | 5 +-
.../apache/helix/examples/BootstrapHandler.java | 17 +-
.../apache/helix/examples/BootstrapProcess.java | 17 +-
.../apache/helix/examples/DummyParticipant.java | 13 +-
.../apache/helix/examples/ExampleProcess.java | 12 +-
.../LeaderStandbyStateModelFactory.java | 11 +-
.../helix/examples/LogicalModelExample.java | 10 +-
.../examples/MasterSlaveStateModelFactory.java | 13 +-
.../OnlineOfflineStateModelFactory.java | 11 +-
.../org/apache/helix/examples/Quickstart.java | 2 +-
.../participant/StatelessServiceStateModel.java | 4 +-
.../StatelessServiceStateModelFactory.java | 6 +-
.../java/org/apache/helix/lockmanager/Lock.java | 4 +-
.../apache/helix/lockmanager/LockFactory.java | 6 +-
.../apache/helix/recipes/rabbitmq/Consumer.java | 6 +-
.../recipes/rabbitmq/ConsumerStateModel.java | 5 +-
.../rabbitmq/ConsumerStateModelFactory.java | 40 --
.../ConsumerStateTransitionHandlerFactory.java | 40 ++
.../helix/filestore/FileStoreStateModel.java | 4 +-
.../filestore/FileStoreStateModelFactory.java | 6 +-
.../helix/taskexecution/TaskStateModel.java | 4 +-
.../taskexecution/TaskStateModelFactory.java | 6 +-
.../helix/userdefinedrebalancer/Lock.java | 4 +-
.../userdefinedrebalancer/LockFactory.java | 6 +-
98 files changed, 1008 insertions(+), 2035 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java
index a9ecaa0..6db5107 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java
@@ -57,12 +57,9 @@ public class TestResetInstance extends AdminTestBase {
MockController controller = new MockController(_zkaddr, clusterName, "controller_0");
controller.syncStart();
- Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
- {
- put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
- put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
- }
- };
+ Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+ errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+ errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
// start mock participants
MockParticipant[] participants = new MockParticipant[n];
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java
index a54b0a3..d066f18 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java
@@ -57,12 +57,9 @@ public class TestResetResource extends AdminTestBase {
MockController controller = new MockController(_zkaddr, clusterName, "controller_0");
controller.syncStart();
- Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
- {
- put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
- put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
- }
- };
+ Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+ errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+ errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
// start mock participants
MockParticipant[] participants = new MockParticipant[n];
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java
----------------------------------------------------------------------
diff --git a/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java b/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java
index d227ac3..509c0b7 100644
--- a/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java
+++ b/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java
@@ -30,17 +30,17 @@ import org.apache.helix.ExternalCommand;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.api.State;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.Message;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.log4j.Logger;
@StateModelInfo(initialState = "OFFLINE", states = {})
-public class AgentStateModel extends StateModel {
+public class AgentStateModel extends TransitionHandler {
private static final Logger _logger = Logger.getLogger(AgentStateModel.class);
private static Pattern pattern = Pattern.compile("(\\{.+?\\})");
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java b/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
index 69d45ae..4e74405 100644
--- a/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
+++ b/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModelFactory.java
@@ -19,13 +19,13 @@ package org.apache.helix.agent;
* under the License.
*/
+import org.apache.helix.api.StateTransitionHandlerFactory;
import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
-public class AgentStateModelFactory extends HelixStateModelFactory<AgentStateModel> {
+public class AgentStateModelFactory extends StateTransitionHandlerFactory<AgentStateModel> {
@Override
- public AgentStateModel createNewStateModel(PartitionId partitionKey) {
+ public AgentStateModel createStateTransitionHandler(PartitionId partitionKey) {
AgentStateModel model = new AgentStateModel();
return model;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/api/StateTransitionHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/StateTransitionHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/api/StateTransitionHandlerFactory.java
new file mode 100644
index 0000000..1e2326d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/StateTransitionHandlerFactory.java
@@ -0,0 +1,114 @@
+package org.apache.helix.api;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.messaging.handling.BatchMessageWrapper;
+
+public abstract class StateTransitionHandlerFactory<T extends TransitionHandler> {
+ /**
+ * map from partitionId to transition-handler
+ */
+ private final ConcurrentMap<PartitionId, T> _transitionHandlerMap =
+ new ConcurrentHashMap<PartitionId, T>();
+
+ /**
+ * map from resourceName to BatchMessageWrapper
+ */
+ private final ConcurrentMap<ResourceId, BatchMessageWrapper> _batchMsgWrapperMap =
+ new ConcurrentHashMap<ResourceId, BatchMessageWrapper>();
+
+ /**
+ * This method will be invoked only once per partition per session
+ * @param partitionId
+ * @return
+ */
+ public abstract T createStateTransitionHandler(PartitionId partitionId);
+
+ /**
+ * Create a state model for a partition
+ * @param partitionId
+ */
+ public T createAndAddSTransitionHandler(PartitionId partitionId) {
+ T stateModel = createStateTransitionHandler(partitionId);
+ _transitionHandlerMap.put(partitionId, stateModel);
+ return stateModel;
+ }
+
+ /**
+ * Get the state model for a partition
+ * @param partitionId
+ * @return state model if exists, null otherwise
+ */
+ public T getTransitionHandler(PartitionId partitionId) {
+ return _transitionHandlerMap.get(partitionId);
+ }
+
+ /**
+ * remove state model for a partition
+ * @param partitionId
+ * @return state model removed or null if not exist
+ */
+ public T removeTransitionHandler(PartitionId partitionId) {
+ return _transitionHandlerMap.remove(partitionId);
+ }
+
+ /**
+ * get partition set
+ * @return partitionId set
+ */
+ public Set<PartitionId> getPartitionSet() {
+ return _transitionHandlerMap.keySet();
+ }
+
+ /**
+ * create a default batch-message-wrapper for a resource
+ * @param resourceId
+ * @return
+ */
+ public BatchMessageWrapper createBatchMessageWrapper(ResourceId resourceId) {
+ return new BatchMessageWrapper();
+ }
+
+ /**
+ * create a batch-message-wrapper for a resource and put it into map
+ * @param resourceId
+ * @return
+ */
+ public BatchMessageWrapper createAndAddBatchMessageWrapper(ResourceId resourceId) {
+ BatchMessageWrapper wrapper = createBatchMessageWrapper(resourceId);
+ _batchMsgWrapperMap.put(resourceId, wrapper);
+ return wrapper;
+ }
+
+ /**
+ * get batch-message-wrapper for a resource
+ * @param resourceId
+ * @return
+ */
+ public BatchMessageWrapper getBatchMessageWrapper(ResourceId resourceId) {
+ return _batchMsgWrapperMap.get(resourceId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/api/TransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/TransitionHandler.java b/helix-core/src/main/java/org/apache/helix/api/TransitionHandler.java
new file mode 100644
index 0000000..ff82a2d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/TransitionHandler.java
@@ -0,0 +1,106 @@
+package org.apache.helix.api;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+public abstract class TransitionHandler {
+ public static final String DEFAULT_INITIAL_STATE = "OFFLINE";
+ Logger logger = Logger.getLogger(TransitionHandler.class);
+
+ // TODO Get default state from implementation or from state model annotation
+ // StateModel with initial state other than OFFLINE should override this field
+ protected String _currentState = DEFAULT_INITIAL_STATE;
+
+ /**
+ * requested-state is used (e.g. by task-framework) to request next state
+ */
+ protected String _requestedState = null;
+
+ public String getCurrentState() {
+ return _currentState;
+ }
+
+ // @transition(from='from', to='to')
+ public void defaultTransitionHandler() {
+ logger
+ .error("Default default handler. The idea is to invoke this if no transition method is found. Yet to be implemented");
+ }
+
+ public boolean updateState(String newState) {
+ _currentState = newState;
+ return true;
+ }
+
+ /**
+ * Get requested-state
+ * @return requested-state
+ */
+ public String getRequestedState() {
+ return _requestedState;
+ }
+
+ /**
+ * Set requested-state
+ * @param requestedState
+ */
+ public void setRequestedState(String requestedState) {
+ _requestedState = requestedState;
+ }
+
+ /**
+ * Called when error occurs in state transition
+ * TODO:enforce subclass to write this
+ * @param message
+ * @param context
+ * @param error
+ */
+ public void rollbackOnError(Message message, NotificationContext context,
+ StateTransitionError error) {
+
+ logger.error("Default rollback method invoked on error. Error Code: " + error.getCode());
+
+ }
+
+ /**
+ * Called when the state model is reset
+ */
+ public void reset() {
+ logger
+ .warn("Default reset method invoked. Either because the process longer own this resource or session timedout");
+ }
+
+ /**
+ * default transition for drop partition in error state
+ * @param message
+ * @param context
+ * @throws InterruptedException
+ */
+ @Transition(to = "DROPPED", from = "ERROR")
+ public void onBecomeDroppedFromError(Message message, NotificationContext context)
+ throws Exception {
+ logger.info("Default ERROR->DROPPED transition invoked.");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/api/id/StateModelDefId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/id/StateModelDefId.java b/helix-core/src/main/java/org/apache/helix/api/id/StateModelDefId.java
index 7c84f0f..b98cbcd 100644
--- a/helix-core/src/main/java/org/apache/helix/api/id/StateModelDefId.java
+++ b/helix-core/src/main/java/org/apache/helix/api/id/StateModelDefId.java
@@ -26,6 +26,10 @@ import org.codehaus.jackson.annotate.JsonProperty;
public class StateModelDefId extends Id {
public static final StateModelDefId SchedulerTaskQueue = StateModelDefId
.from(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE);
+ public static final StateModelDefId MasterSlave = StateModelDefId.from("MasterSlave");
+ public static final StateModelDefId LeaderStandby = StateModelDefId.from("LeaderStandby");
+ public static final StateModelDefId OnlineOffline = StateModelDefId.from("OnlineOffline");
+
@JsonProperty("id")
private final String _id;
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
index 6aa3ab9..ca540c5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
@@ -47,6 +47,7 @@ import org.apache.commons.cli.ParseException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.manager.zk.HelixManagerShutdownHook;
import org.apache.helix.participant.DistClusterControllerStateModelFactory;
import org.apache.helix.participant.StateMachineEngine;
@@ -164,18 +165,11 @@ public class HelixControllerMain {
DistClusterControllerStateModelFactory stateModelFactory =
new DistClusterControllerStateModelFactory(zkConnectString);
- // StateMachineEngine genericStateMachineHandler = new
- // StateMachineEngine();
StateMachineEngine stateMach = manager.getStateMachineEngine();
- stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory);
- // manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
- // genericStateMachineHandler);
+ stateMach.registerStateModelFactory(StateModelDefId.LeaderStandby, stateModelFactory);
manager.connect();
} else {
logger.error("cluster controller mode:" + controllerMode + " NOT supported");
- // throw new
- // IllegalArgumentException("Unsupported cluster controller mode:" +
- // controllerMode);
}
} catch (Exception e) {
logger.error("Exception while starting controller", e);
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
deleted file mode 100644
index cdb2845..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ /dev/null
@@ -1,420 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import static org.apache.helix.HelixConstants.ChangeType.CONFIG;
-import static org.apache.helix.HelixConstants.ChangeType.CURRENT_STATE;
-import static org.apache.helix.HelixConstants.ChangeType.EXTERNAL_VIEW;
-import static org.apache.helix.HelixConstants.ChangeType.IDEAL_STATE;
-import static org.apache.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
-import static org.apache.helix.HelixConstants.ChangeType.MESSAGE;
-import static org.apache.helix.HelixConstants.ChangeType.MESSAGES_CONTROLLER;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.ControllerChangeListener;
-import org.apache.helix.CurrentStateChangeListener;
-import org.apache.helix.ExternalViewChangeListener;
-import org.apache.helix.HelixConstants.ChangeType;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.IdealStateChangeListener;
-import org.apache.helix.InstanceConfigChangeListener;
-import org.apache.helix.LiveInstanceChangeListener;
-import org.apache.helix.MessageListener;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.NotificationContext.Type;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.ScopedConfigChangeListener;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-
-public class CallbackHandler implements IZkChildListener, IZkDataListener
-
-{
- private static Logger logger = Logger.getLogger(CallbackHandler.class);
-
- /**
- * define the next possible notification types
- */
- private static Map<Type, List<Type>> nextNotificationType = new HashMap<Type, List<Type>>();
- static {
- nextNotificationType.put(Type.INIT, Arrays.asList(Type.CALLBACK, Type.FINALIZE));
- nextNotificationType.put(Type.CALLBACK, Arrays.asList(Type.CALLBACK, Type.FINALIZE));
- nextNotificationType.put(Type.FINALIZE, Arrays.asList(Type.INIT));
- }
-
- private final String _path;
- private final Object _listener;
- private final EventType[] _eventTypes;
- private final HelixDataAccessor _accessor;
- private final ChangeType _changeType;
- private final ZkClient _zkClient;
- private final AtomicLong _lastNotificationTimeStamp;
- private final HelixManager _manager;
- private final PropertyKey _propertyKey;
-
- /**
- * maintain the expected notification types
- * this is fix for HELIX-195: race condition between FINALIZE callbacks and Zk callbacks
- */
- private List<NotificationContext.Type> _expectTypes = nextNotificationType.get(Type.FINALIZE);
-
- public CallbackHandler(HelixManager manager, ZkClient client, PropertyKey propertyKey,
- Object listener, EventType[] eventTypes, ChangeType changeType) {
- if (listener == null) {
- throw new HelixException("listener could not be null");
- }
-
- this._manager = manager;
- this._accessor = manager.getHelixDataAccessor();
- this._zkClient = client;
- this._propertyKey = propertyKey;
- this._path = propertyKey.getPath();
- this._listener = listener;
- this._eventTypes = eventTypes;
- this._changeType = changeType;
- this._lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
- init();
- }
-
- public Object getListener() {
- return _listener;
- }
-
- public String getPath() {
- return _path;
- }
-
- public void invoke(NotificationContext changeContext) throws Exception {
- // This allows the listener to work with one change at a time
- synchronized (_manager) {
- Type type = changeContext.getType();
- if (!_expectTypes.contains(type)) {
- logger.warn("Skip processing callbacks for listener: " + _listener + ", path: " + _path
- + ", expected types: " + _expectTypes + " but was " + type);
- return;
- }
- _expectTypes = nextNotificationType.get(type);
-
- // Builder keyBuilder = _accessor.keyBuilder();
- long start = System.currentTimeMillis();
- if (logger.isInfoEnabled()) {
- logger.info(Thread.currentThread().getId() + " START:INVOKE " + _path + " listener:"
- + _listener.getClass().getCanonicalName());
- }
-
- if (_changeType == IDEAL_STATE) {
-
- IdealStateChangeListener idealStateChangeListener = (IdealStateChangeListener) _listener;
- subscribeForChanges(changeContext, _path, true, true);
- List<IdealState> idealStates = _accessor.getChildValues(_propertyKey);
-
- idealStateChangeListener.onIdealStateChange(idealStates, changeContext);
-
- } else if (_changeType == ChangeType.INSTANCE_CONFIG) {
- subscribeForChanges(changeContext, _path, true, true);
- InstanceConfigChangeListener listener = (InstanceConfigChangeListener) _listener;
- List<InstanceConfig> configs = _accessor.getChildValues(_propertyKey);
- listener.onInstanceConfigChange(configs, changeContext);
- } else if (_changeType == CONFIG) {
- subscribeForChanges(changeContext, _path, true, true);
- ScopedConfigChangeListener listener = (ScopedConfigChangeListener) _listener;
- List<HelixProperty> configs = _accessor.getChildValues(_propertyKey);
- listener.onConfigChange(configs, changeContext);
- } else if (_changeType == LIVE_INSTANCE) {
- LiveInstanceChangeListener liveInstanceChangeListener =
- (LiveInstanceChangeListener) _listener;
- subscribeForChanges(changeContext, _path, true, true);
- List<LiveInstance> liveInstances = _accessor.getChildValues(_propertyKey);
-
- liveInstanceChangeListener.onLiveInstanceChange(liveInstances, changeContext);
-
- } else if (_changeType == CURRENT_STATE) {
- CurrentStateChangeListener currentStateChangeListener =
- (CurrentStateChangeListener) _listener;
- subscribeForChanges(changeContext, _path, true, true);
- String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
-
- List<CurrentState> currentStates = _accessor.getChildValues(_propertyKey);
-
- currentStateChangeListener.onStateChange(instanceName, currentStates, changeContext);
-
- } else if (_changeType == MESSAGE) {
- MessageListener messageListener = (MessageListener) _listener;
- subscribeForChanges(changeContext, _path, true, false);
- String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
- List<Message> messages = _accessor.getChildValues(_propertyKey);
-
- messageListener.onMessage(instanceName, messages, changeContext);
-
- } else if (_changeType == MESSAGES_CONTROLLER) {
- MessageListener messageListener = (MessageListener) _listener;
- subscribeForChanges(changeContext, _path, true, false);
- List<Message> messages = _accessor.getChildValues(_propertyKey);
-
- messageListener.onMessage(_manager.getInstanceName(), messages, changeContext);
-
- } else if (_changeType == EXTERNAL_VIEW) {
- ExternalViewChangeListener externalViewListener = (ExternalViewChangeListener) _listener;
- subscribeForChanges(changeContext, _path, true, true);
- List<ExternalView> externalViewList = _accessor.getChildValues(_propertyKey);
-
- externalViewListener.onExternalViewChange(externalViewList, changeContext);
- } else if (_changeType == ChangeType.CONTROLLER) {
- ControllerChangeListener controllerChangelistener = (ControllerChangeListener) _listener;
- subscribeForChanges(changeContext, _path, true, false);
- controllerChangelistener.onControllerChange(changeContext);
- }
-
- long end = System.currentTimeMillis();
- if (logger.isInfoEnabled()) {
- logger.info(Thread.currentThread().getId() + " END:INVOKE " + _path + " listener:"
- + _listener.getClass().getCanonicalName() + " Took: " + (end - start) + "ms");
- }
- }
- }
-
- private void subscribeChildChange(String path, NotificationContext context) {
- NotificationContext.Type type = context.getType();
- if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK) {
- logger.info(_manager.getInstanceName() + " subscribes child-change. path: " + path
- + ", listener: " + _listener);
- _zkClient.subscribeChildChanges(path, this);
- } else if (type == NotificationContext.Type.FINALIZE) {
- logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + path
- + ", listener: " + _listener);
-
- _zkClient.unsubscribeChildChanges(path, this);
- }
- }
-
- private void subscribeDataChange(String path, NotificationContext context) {
- NotificationContext.Type type = context.getType();
- if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK) {
- if (logger.isDebugEnabled()) {
- logger.debug(_manager.getInstanceName() + " subscribe data-change. path: " + path
- + ", listener: " + _listener);
- }
- _zkClient.subscribeDataChanges(path, this);
-
- } else if (type == NotificationContext.Type.FINALIZE) {
- logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + path
- + ", listener: " + _listener);
-
- _zkClient.unsubscribeDataChanges(path, this);
- }
- }
-
- // TODO watchParent is always true. consider remove it
- private void subscribeForChanges(NotificationContext context, String path, boolean watchParent,
- boolean watchChild) {
- if (watchParent) {
- subscribeChildChange(path, context);
- }
-
- if (watchChild) {
- try {
- switch (_changeType) {
- case CURRENT_STATE:
- case IDEAL_STATE:
- case EXTERNAL_VIEW: {
- // check if bucketized
- BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
- List<ZNRecord> records = baseAccessor.getChildren(path, null, 0);
- for (ZNRecord record : records) {
- HelixProperty property = new HelixProperty(record);
- String childPath = path + "/" + record.getId();
-
- int bucketSize = property.getBucketSize();
- if (bucketSize > 0) {
- // subscribe both data-change and child-change on bucketized parent node
- // data-change gives a delete-callback which is used to remove watch
- subscribeChildChange(childPath, context);
- subscribeDataChange(childPath, context);
-
- // subscribe data-change on bucketized child
- List<String> bucketizedChildNames = _zkClient.getChildren(childPath);
- if (bucketizedChildNames != null) {
- for (String bucketizedChildName : bucketizedChildNames) {
- String bucketizedChildPath = childPath + "/" + bucketizedChildName;
- subscribeDataChange(bucketizedChildPath, context);
- }
- }
- } else {
- subscribeDataChange(childPath, context);
- }
- }
- break;
- }
- default: {
- List<String> childNames = _zkClient.getChildren(path);
- if (childNames != null) {
- for (String childName : childNames) {
- String childPath = path + "/" + childName;
- subscribeDataChange(childPath, context);
- }
- }
- break;
- }
- }
- } catch (ZkNoNodeException e) {
- logger.warn("fail to subscribe child/data change. path: " + path + ", listener: "
- + _listener, e);
- }
- }
-
- }
-
- public EventType[] getEventTypes() {
- return _eventTypes;
- }
-
- /**
- * Invoke the listener so that it sets up the initial values from the zookeeper if any
- * exists
- */
- public void init() {
- updateNotificationTime(System.nanoTime());
- try {
- NotificationContext changeContext = new NotificationContext(_manager);
- changeContext.setType(NotificationContext.Type.INIT);
- invoke(changeContext);
- } catch (Exception e) {
- String msg = "Exception while invoking init callback for listener:" + _listener;
- ZKExceptionHandler.getInstance().handle(msg, e);
- }
- }
-
- @Override
- public void handleDataChange(String dataPath, Object data) {
- try {
- updateNotificationTime(System.nanoTime());
- if (dataPath != null && dataPath.startsWith(_path)) {
- NotificationContext changeContext = new NotificationContext(_manager);
- changeContext.setType(NotificationContext.Type.CALLBACK);
- invoke(changeContext);
- }
- } catch (Exception e) {
- String msg =
- "exception in handling data-change. path: " + dataPath + ", listener: " + _listener;
- ZKExceptionHandler.getInstance().handle(msg, e);
- }
- }
-
- @Override
- public void handleDataDeleted(String dataPath) {
- try {
- updateNotificationTime(System.nanoTime());
- if (dataPath != null && dataPath.startsWith(_path)) {
- logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + dataPath
- + ", listener: " + _listener);
- _zkClient.unsubscribeDataChanges(dataPath, this);
-
- // only needed for bucketized parent, but OK if we don't have child-change
- // watch on the bucketized parent path
- logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + dataPath
- + ", listener: " + _listener);
- _zkClient.unsubscribeChildChanges(dataPath, this);
- // No need to invoke() since this event will handled by child-change on parent-node
- // NotificationContext changeContext = new NotificationContext(_manager);
- // changeContext.setType(NotificationContext.Type.CALLBACK);
- // invoke(changeContext);
- }
- } catch (Exception e) {
- String msg =
- "exception in handling data-delete-change. path: " + dataPath + ", listener: "
- + _listener;
- ZKExceptionHandler.getInstance().handle(msg, e);
- }
- }
-
- @Override
- public void handleChildChange(String parentPath, List<String> currentChilds) {
- try {
- updateNotificationTime(System.nanoTime());
- if (parentPath != null && parentPath.startsWith(_path)) {
- NotificationContext changeContext = new NotificationContext(_manager);
-
- if (currentChilds == null) {
- // parentPath has been removed
- if (parentPath.equals(_path)) {
- // _path has been removed, remove this listener
- _manager.removeListener(_propertyKey, _listener);
- }
- changeContext.setType(NotificationContext.Type.FINALIZE);
- } else {
- changeContext.setType(NotificationContext.Type.CALLBACK);
- }
- invoke(changeContext);
- }
- } catch (Exception e) {
- String msg =
- "exception in handling child-change. instance: " + _manager.getInstanceName()
- + ", parentPath: " + parentPath + ", listener: " + _listener;
- ZKExceptionHandler.getInstance().handle(msg, e);
- }
- }
-
- /**
- * Invoke the listener for the last time so that the listener could clean up resources
- */
- public void reset() {
- try {
- NotificationContext changeContext = new NotificationContext(_manager);
- changeContext.setType(NotificationContext.Type.FINALIZE);
- invoke(changeContext);
- } catch (Exception e) {
- String msg = "Exception while resetting the listener:" + _listener;
- ZKExceptionHandler.getInstance().handle(msg, e);
- }
- }
-
- private void updateNotificationTime(long nanoTime) {
- long l = _lastNotificationTimeStamp.get();
- while (nanoTime > l) {
- boolean b = _lastNotificationTimeStamp.compareAndSet(l, nanoTime);
- if (b) {
- break;
- } else {
- l = _lastNotificationTimeStamp.get();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
deleted file mode 100644
index c1d856d..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
+++ /dev/null
@@ -1,281 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.lang.management.ManagementFactory;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.apache.helix.AccessOption;
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
-import org.apache.helix.LiveInstanceInfoProvider;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.messaging.DefaultMessagingService;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
-
-/**
- * helper class for participant-manager
- */
-public class ParticipantManagerHelper {
- private static Logger LOG = Logger.getLogger(ParticipantManagerHelper.class);
-
- final ZkClient _zkclient;
- final HelixManager _manager;
- final PropertyKey.Builder _keyBuilder;
- final String _clusterName;
- final String _instanceName;
- final String _sessionId;
- final int _sessionTimeout;
- final ConfigAccessor _configAccessor;
- final InstanceType _instanceType;
- final HelixAdmin _helixAdmin;
- final ZKHelixDataAccessor _dataAccessor;
- final DefaultMessagingService _messagingService;
- final StateMachineEngine _stateMachineEngine;
- final LiveInstanceInfoProvider _liveInstanceInfoProvider;
-
- public ParticipantManagerHelper(HelixManager manager, ZkClient zkclient, int sessionTimeout,
- LiveInstanceInfoProvider liveInstanceInfoProvider) {
- _zkclient = zkclient;
- _manager = manager;
- _clusterName = manager.getClusterName();
- _instanceName = manager.getInstanceName();
- _keyBuilder = new PropertyKey.Builder(_clusterName);
- _sessionId = manager.getSessionId();
- _sessionTimeout = sessionTimeout;
- _configAccessor = manager.getConfigAccessor();
- _instanceType = manager.getInstanceType();
- _helixAdmin = manager.getClusterManagmentTool();
- _dataAccessor = (ZKHelixDataAccessor) manager.getHelixDataAccessor();
- _messagingService = (DefaultMessagingService) manager.getMessagingService();
- _stateMachineEngine = manager.getStateMachineEngine();
- _liveInstanceInfoProvider = liveInstanceInfoProvider;
- }
-
- public void joinCluster() {
- // Read cluster config and see if instance can auto join the cluster
- boolean autoJoin = false;
- try {
- HelixConfigScope scope =
- new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(
- _manager.getClusterName()).build();
- autoJoin =
- Boolean.parseBoolean(_configAccessor.get(scope,
- ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN));
- LOG.info("instance: " + _instanceName + " auto-joining " + _clusterName + " is " + autoJoin);
- } catch (Exception e) {
- // autoJoin is false
- }
-
- if (!ZKUtil.isInstanceSetup(_zkclient, _clusterName, _instanceName, _instanceType)) {
- if (!autoJoin) {
- throw new HelixException("Initial cluster structure is not set up for instance: "
- + _instanceName + ", instanceType: " + _instanceType);
- } else {
- LOG.info(_instanceName + " is auto-joining cluster: " + _clusterName);
- InstanceConfig instanceConfig = new InstanceConfig(_instanceName);
- String hostName = _instanceName;
- String port = "";
- int lastPos = _instanceName.lastIndexOf("_");
- if (lastPos > 0) {
- hostName = _instanceName.substring(0, lastPos);
- port = _instanceName.substring(lastPos + 1);
- }
- instanceConfig.setHostName(hostName);
- instanceConfig.setPort(port);
- instanceConfig.setInstanceEnabled(true);
- _helixAdmin.addInstance(_clusterName, instanceConfig);
- }
- }
- }
-
- public void createLiveInstance() {
- String liveInstancePath = _keyBuilder.liveInstance(_instanceName).getPath();
- LiveInstance liveInstance = new LiveInstance(_instanceName);
- liveInstance.setSessionId(_sessionId);
- liveInstance.setHelixVersion(_manager.getVersion());
- liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
-
- // LiveInstanceInfoProvider liveInstanceInfoProvider = _manager._liveInstanceInfoProvider;
- if (_liveInstanceInfoProvider != null) {
- LOG.info("invoke liveInstanceInfoProvider");
- ZNRecord additionalLiveInstanceInfo =
- _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo();
- if (additionalLiveInstanceInfo != null) {
- additionalLiveInstanceInfo.merge(liveInstance.getRecord());
- ZNRecord mergedLiveInstance = new ZNRecord(additionalLiveInstanceInfo, _instanceName);
- liveInstance = new LiveInstance(mergedLiveInstance);
- LOG.info("instanceName: " + _instanceName + ", mergedLiveInstance: " + liveInstance);
- }
- }
-
- boolean retry;
- do {
- retry = false;
- try {
- _zkclient.createEphemeral(liveInstancePath, liveInstance.getRecord());
- } catch (ZkNodeExistsException e) {
- LOG.warn("found another instance with same instanceName: " + _instanceName + " in cluster "
- + _clusterName);
-
- Stat stat = new Stat();
- ZNRecord record = _zkclient.readData(liveInstancePath, stat, true);
- if (record == null) {
- /**
- * live-instance is gone as we check it, retry create live-instance
- */
- retry = true;
- } else {
- String ephemeralOwner = Long.toHexString(stat.getEphemeralOwner());
- if (ephemeralOwner.equals(_sessionId)) {
- /**
- * update sessionId field in live-instance if necessary
- */
- LiveInstance curLiveInstance = new LiveInstance(record);
- if (!curLiveInstance.getTypedSessionId().stringify().equals(_sessionId)) {
- /**
- * in last handle-new-session,
- * live-instance is created by new zkconnection with stale session-id inside
- * just update session-id field
- */
- LOG.info("overwriting session-id by ephemeralOwner: " + ephemeralOwner
- + ", old-sessionId: " + curLiveInstance.getTypedSessionId() + ", new-sessionId: "
- + _sessionId);
-
- curLiveInstance.setSessionId(_sessionId);
- _zkclient.writeData(liveInstancePath, curLiveInstance.getRecord());
- }
- } else {
- /**
- * wait for a while, in case previous helix-participant exits unexpectedly
- * and its live-instance still hangs around until session timeout
- */
- try {
- TimeUnit.MILLISECONDS.sleep(_sessionTimeout + 5000);
- } catch (InterruptedException ex) {
- LOG.warn("Sleep interrupted while waiting for previous live-instance to go away.", ex);
- }
- /**
- * give a last try after exit while loop
- */
- retry = true;
- break;
- }
- }
- }
- } while (retry);
-
- /**
- * give a last shot
- */
- if (retry) {
- try {
- _zkclient.createEphemeral(liveInstancePath, liveInstance.getRecord());
- } catch (Exception e) {
- String errorMessage =
- "instance: " + _instanceName + " already has a live-instance in cluster "
- + _clusterName;
- LOG.error(errorMessage);
- throw new HelixException(errorMessage);
- }
- }
- }
-
- /**
- * carry over current-states from last sessions
- * set to initial state for current session only when state doesn't exist in current session
- */
- public void carryOverPreviousCurrentState() {
- List<String> sessions = _dataAccessor.getChildNames(_keyBuilder.sessions(_instanceName));
-
- for (String session : sessions) {
- if (session.equals(_sessionId)) {
- continue;
- }
-
- List<CurrentState> lastCurStates =
- _dataAccessor.getChildValues(_keyBuilder.currentStates(_instanceName, session));
-
- for (CurrentState lastCurState : lastCurStates) {
- LOG.info("Carrying over old session: " + session + ", resource: " + lastCurState.getId()
- + " to current session: " + _sessionId);
- String stateModelDefRef = lastCurState.getStateModelDefRef();
- if (stateModelDefRef == null) {
- LOG.error("skip carry-over because previous current state doesn't have a state model definition. previous current-state: "
- + lastCurState);
- continue;
- }
- StateModelDefinition stateModel =
- _dataAccessor.getProperty(_keyBuilder.stateModelDef(stateModelDefRef));
-
- String curStatePath =
- _keyBuilder.currentState(_instanceName, _sessionId, lastCurState.getResourceName())
- .getPath();
- _dataAccessor.getBaseDataAccessor().update(curStatePath,
- new CurStateCarryOverUpdater(_sessionId, stateModel.getInitialState(), lastCurState),
- AccessOption.PERSISTENT);
- }
- }
-
- /**
- * remove previous current state parent nodes
- */
- for (String session : sessions) {
- if (session.equals(_sessionId)) {
- continue;
- }
-
- String path = _keyBuilder.currentStates(_instanceName, session).getPath();
- LOG.info("Removing current states from previous sessions. path: " + path);
- _zkclient.deleteRecursive(path);
- }
- }
-
- public void setupMsgHandler() throws Exception {
- _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
- _stateMachineEngine);
- _manager.addMessageListener(_messagingService.getExecutor(), _instanceName);
-
- ScheduledTaskStateModelFactory stStateModelFactory =
- new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
- _stateMachineEngine.registerStateModelFactory(
- DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, stStateModelFactory);
- _messagingService.onConnected();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
index af50eb7..81810de 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
@@ -44,6 +44,7 @@ import org.apache.helix.api.config.ParticipantConfig;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.Id;
import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.HelixConfigScope;
@@ -348,8 +349,8 @@ public class ZkHelixParticipant implements HelixParticipant {
ScheduledTaskStateModelFactory stStateModelFactory =
new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
- _stateMachineEngine.registerStateModelFactory(
- DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, stStateModelFactory);
+ _stateMachineEngine.registerStateModelFactory(StateModelDefId.SchedulerTaskQueue,
+ stStateModelFactory);
_messagingService.onConnected();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 1bb6506..4ede2a4 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -43,14 +43,14 @@ import org.apache.helix.ZNRecordBucketizer;
import org.apache.helix.ZNRecordDelta;
import org.apache.helix.ZNRecordDelta.MergeOperation;
import org.apache.helix.api.State;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SessionId;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.Attributes;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelParser;
import org.apache.helix.participant.statemachine.StateTransitionError;
import org.apache.helix.util.StatusUpdateUtil;
@@ -66,16 +66,16 @@ public class HelixStateTransitionHandler extends MessageHandler {
}
private static final Logger logger = Logger.getLogger(HelixStateTransitionHandler.class);
- private final StateModel _stateModel;
+ private final TransitionHandler _stateModel;
StatusUpdateUtil _statusUpdateUtil;
private final StateModelParser _transitionMethodFinder;
private final CurrentState _currentStateDelta;
private final HelixManager _manager;
- private final StateModelFactory<? extends StateModel> _stateModelFactory;
+ private final StateTransitionHandlerFactory<? extends TransitionHandler> _stateModelFactory;
volatile boolean _isTimeout = false;
- public HelixStateTransitionHandler(StateModelFactory<? extends StateModel> stateModelFactory,
- StateModel stateModel, Message message, NotificationContext context,
+ public HelixStateTransitionHandler(StateTransitionHandlerFactory<? extends TransitionHandler> stateModelFactory,
+ TransitionHandler stateModel, Message message, NotificationContext context,
CurrentState currentStateDelta) {
super(message, context);
_stateModel = stateModel;
@@ -206,7 +206,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
deltaList.add(delta);
_currentStateDelta.setDeltaList(deltaList);
- _stateModelFactory.removeStateModel(partitionId.stringify());
+ _stateModelFactory.removeTransitionHandler(partitionId);
} else {
// if the partition is not to be dropped, update _stateModel to the TO_STATE
_stateModel.updateState(toState.toString());
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java b/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
index 6c96629..17adf7f 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
@@ -29,6 +29,7 @@ import org.apache.helix.LiveInstanceChangeListener;
import org.apache.helix.NotificationContext;
import org.apache.helix.NotificationContext.Type;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.id.PartitionId;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
@@ -39,9 +40,9 @@ public class CustomCodeInvoker implements LiveInstanceChangeListener, InstanceCo
ExternalViewChangeListener {
private static Logger LOG = Logger.getLogger(CustomCodeInvoker.class);
private final CustomCodeCallbackHandler _callback;
- private final String _partitionKey;
+ private final PartitionId _partitionKey;
- public CustomCodeInvoker(CustomCodeCallbackHandler callback, String partitionKey) {
+ public CustomCodeInvoker(CustomCodeCallbackHandler callback, PartitionId partitionKey) {
_callback = callback;
_partitionKey = partitionKey;
}
@@ -60,7 +61,7 @@ public class CustomCodeInvoker implements LiveInstanceChangeListener, InstanceCo
String sessionId = manager.getSessionId();
// get resource name from partition key: "PARTICIPANT_LEADER_XXX_0"
- String resourceName = _partitionKey.substring(0, _partitionKey.lastIndexOf('_'));
+ String resourceName = _partitionKey.stringify().substring(0, _partitionKey.stringify().lastIndexOf('_'));
CurrentState curState =
accessor.getProperty(keyBuilder.currentState(instance, sessionId, resourceName));
@@ -68,7 +69,7 @@ public class CustomCodeInvoker implements LiveInstanceChangeListener, InstanceCo
return;
}
- String state = curState.getState(_partitionKey);
+ String state = curState.getState(_partitionKey.stringify());
if (state == null || !state.equalsIgnoreCase("LEADER")) {
return;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
index 0c2eb7c..4579810 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
@@ -23,8 +23,8 @@ import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.StateModelParser;
import org.apache.helix.participant.statemachine.StateTransitionError;
@@ -34,7 +34,7 @@ import org.apache.log4j.Logger;
@StateModelInfo(initialState = "OFFLINE", states = {
"LEADER", "STANDBY"
})
-public class DistClusterControllerStateModel extends StateModel {
+public class DistClusterControllerStateModel extends TransitionHandler {
private static Logger logger = Logger.getLogger(DistClusterControllerStateModel.class);
private HelixManager _controller = null;
private final String _zkAddr;
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
index a367c81..5f21e90 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModelFactory.java
@@ -1,5 +1,8 @@
package org.apache.helix.participant;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.id.PartitionId;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,10 +22,9 @@ package org.apache.helix.participant;
* under the License.
*/
-import org.apache.helix.participant.statemachine.StateModelFactory;
public class DistClusterControllerStateModelFactory extends
- StateModelFactory<DistClusterControllerStateModel> {
+ StateTransitionHandlerFactory<DistClusterControllerStateModel> {
private final String _zkAddr;
public DistClusterControllerStateModelFactory(String zkAddr) {
@@ -30,7 +32,7 @@ public class DistClusterControllerStateModelFactory extends
}
@Override
- public DistClusterControllerStateModel createNewStateModel(String stateUnitKey) {
+ public DistClusterControllerStateModel createStateTransitionHandler(PartitionId partition) {
return new DistClusterControllerStateModel(_zkAddr);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
index 3866cf5..ba23a43 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
@@ -25,8 +25,9 @@ import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.log4j.Logger;
@@ -34,14 +35,14 @@ import org.apache.log4j.Logger;
@StateModelInfo(initialState = "OFFLINE", states = {
"LEADER", "STANDBY"
})
-public class GenericLeaderStandbyModel extends StateModel {
+public class GenericLeaderStandbyModel extends TransitionHandler {
private static Logger LOG = Logger.getLogger(GenericLeaderStandbyModel.class);
private final CustomCodeInvoker _particHolder;
private final List<ChangeType> _notificationTypes;
public GenericLeaderStandbyModel(CustomCodeCallbackHandler callback,
- List<ChangeType> notificationTypes, String partitionKey) {
+ List<ChangeType> notificationTypes, PartitionId partitionKey) {
_particHolder = new CustomCodeInvoker(callback, partitionKey);
_notificationTypes = notificationTypes;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
index 51c91cc..c5d7390 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyStateModelFactory.java
@@ -22,10 +22,11 @@ package org.apache.helix.participant;
import java.util.List;
import org.apache.helix.HelixConstants.ChangeType;
-import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.id.PartitionId;
public class GenericLeaderStandbyStateModelFactory extends
- StateModelFactory<GenericLeaderStandbyModel> {
+ StateTransitionHandlerFactory<GenericLeaderStandbyModel> {
private final CustomCodeCallbackHandler _callback;
private final List<ChangeType> _notificationTypes;
@@ -39,7 +40,7 @@ public class GenericLeaderStandbyStateModelFactory extends
}
@Override
- public GenericLeaderStandbyModel createNewStateModel(String partitionKey) {
- return new GenericLeaderStandbyModel(_callback, _notificationTypes, partitionKey);
+ public GenericLeaderStandbyModel createStateTransitionHandler(PartitionId partition) {
+ return new GenericLeaderStandbyModel(_callback, _notificationTypes, partition);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
index 2f169cc..97b266e 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
@@ -55,7 +55,6 @@ import org.apache.log4j.Logger;
* </code>
*/
public class HelixCustomCodeRunner {
- private static final String LEADER_STANDBY = "LeaderStandby";
private static Logger LOG = Logger.getLogger(HelixCustomCodeRunner.class);
private static String PARTICIPANT_LEADER = "PARTICIPANT_LEADER";
@@ -130,7 +129,8 @@ public class HelixCustomCodeRunner {
_stateModelFty = new GenericLeaderStandbyStateModelFactory(_callback, _notificationTypes);
StateMachineEngine stateMach = _manager.getStateMachineEngine();
- stateMach.registerStateModelFactory(LEADER_STANDBY, _stateModelFty, _resourceName);
+ stateMach.registerStateModelFactory(StateModelDefId.LeaderStandby, _resourceName,
+ _stateModelFty);
ZkClient zkClient = null;
try {
// manually add ideal state for participant leader using LeaderStandby
@@ -148,7 +148,7 @@ public class HelixCustomCodeRunner {
idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
idealState.setReplicas(StateModelToken.ANY_LIVEINSTANCE.toString());
idealState.setNumPartitions(1);
- idealState.setStateModelDefId(StateModelDefId.from(LEADER_STANDBY));
+ idealState.setStateModelDefId(StateModelDefId.LeaderStandby);
idealState.setStateModelFactoryId(StateModelFactoryId.from(_resourceName));
List<String> prefList =
new ArrayList<String>(Arrays.asList(StateModelToken.ANY_LIVEINSTANCE.toString()));
@@ -177,7 +177,7 @@ public class HelixCustomCodeRunner {
*/
public void stop() {
LOG.info("Removing stateModelFactory for " + _resourceName);
- _manager.getStateMachineEngine().removeStateModelFactory(LEADER_STANDBY, _stateModelFty,
+ _manager.getStateMachineEngine().removeStateModelFactory(StateModelDefId.LeaderStandby,
_resourceName);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index 95afb70..56769b5 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -32,6 +32,8 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.NotificationContext.MapKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.api.State;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.api.id.MessageId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
@@ -46,10 +48,6 @@ import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
-import org.apache.helix.participant.statemachine.HelixStateModelFactoryAdaptor;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelParser;
import org.apache.log4j.Logger;
@@ -58,27 +56,26 @@ public class HelixStateMachineEngine implements StateMachineEngine {
/**
* Map of StateModelDefId to map of FactoryName to StateModelFactory
- * TODO change to use StateModelDefId and HelixStateModelFactory
*/
- private final Map<String, Map<String, StateModelFactory<? extends StateModel>>> _stateModelFactoryMap;
+ private final Map<StateModelDefId, Map<String, StateTransitionHandlerFactory<? extends TransitionHandler>>> _stateModelFactoryMap;
private final StateModelParser _stateModelParser;
private final HelixManager _manager;
- private final ConcurrentHashMap<String, StateModelDefinition> _stateModelDefs;
+ private final ConcurrentHashMap<StateModelDefId, StateModelDefinition> _stateModelDefs;
public HelixStateMachineEngine(HelixManager manager) {
_stateModelParser = new StateModelParser();
_manager = manager;
_stateModelFactoryMap =
- new ConcurrentHashMap<String, Map<String, StateModelFactory<? extends StateModel>>>();
- _stateModelDefs = new ConcurrentHashMap<String, StateModelDefinition>();
+ new ConcurrentHashMap<StateModelDefId, Map<String, StateTransitionHandlerFactory<? extends TransitionHandler>>>();
+ _stateModelDefs = new ConcurrentHashMap<StateModelDefId, StateModelDefinition>();
}
- public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName) {
+ public StateTransitionHandlerFactory<? extends TransitionHandler> getStateModelFactory(StateModelDefId stateModelName) {
return getStateModelFactory(stateModelName, HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
}
- public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName,
+ public StateTransitionHandlerFactory<? extends TransitionHandler> getStateModelFactory(StateModelDefId stateModelName,
String factoryName) {
if (!_stateModelFactoryMap.containsKey(stateModelName)) {
return null;
@@ -86,39 +83,6 @@ public class HelixStateMachineEngine implements StateMachineEngine {
return _stateModelFactoryMap.get(stateModelName).get(factoryName);
}
- @Override
- public boolean registerStateModelFactory(String stateModelDef,
- StateModelFactory<? extends StateModel> factory) {
- return registerStateModelFactory(stateModelDef, factory,
- HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
- }
-
- @Override
- public boolean registerStateModelFactory(String stateModelName,
- StateModelFactory<? extends StateModel> factory, String factoryName) {
- if (stateModelName == null || factory == null || factoryName == null) {
- throw new HelixException("stateModelDef|stateModelFactory|factoryName cannot be null");
- }
-
- LOG.info("Register state model factory for state model " + stateModelName
- + " using factory name " + factoryName + " with " + factory);
-
- if (!_stateModelFactoryMap.containsKey(stateModelName)) {
- _stateModelFactoryMap.put(stateModelName,
- new ConcurrentHashMap<String, StateModelFactory<? extends StateModel>>());
- }
-
- if (_stateModelFactoryMap.get(stateModelName).containsKey(factoryName)) {
- LOG.warn("stateModelFactory for " + stateModelName + " using factoryName " + factoryName
- + " has already been registered.");
- return false;
- }
-
- _stateModelFactoryMap.get(stateModelName).put(factoryName, factory);
- sendNopMessage();
- return true;
- }
-
// TODO: duplicated code in DefaultMessagingService
private void sendNopMessage() {
if (_manager.isConnected()) {
@@ -150,11 +114,11 @@ public class HelixStateMachineEngine implements StateMachineEngine {
@Override
public void reset() {
- for (Map<String, StateModelFactory<? extends StateModel>> ftyMap : _stateModelFactoryMap
+ for (Map<String, StateTransitionHandlerFactory<? extends TransitionHandler>> ftyMap : _stateModelFactoryMap
.values()) {
- for (StateModelFactory<? extends StateModel> stateModelFactory : ftyMap.values()) {
- for (String resourceKey : stateModelFactory.getPartitionSet()) {
- StateModel stateModel = stateModelFactory.getStateModel(resourceKey);
+ for (StateTransitionHandlerFactory<? extends TransitionHandler> stateModelFactory : ftyMap.values()) {
+ for (PartitionId partition : stateModelFactory.getPartitionSet()) {
+ TransitionHandler stateModel = stateModelFactory.getTransitionHandler(partition);
stateModel.reset();
String initialState = _stateModelParser.getInitialState(stateModel.getClass());
stateModel.updateState(initialState);
@@ -191,8 +155,8 @@ public class HelixStateMachineEngine implements StateMachineEngine {
factoryName = HelixConstants.DEFAULT_STATE_MODEL_FACTORY;
}
- StateModelFactory<? extends StateModel> stateModelFactory =
- getStateModelFactory(stateModelId.stringify(), factoryName);
+ StateTransitionHandlerFactory<? extends TransitionHandler> stateModelFactory =
+ getStateModelFactory(stateModelId, factoryName);
if (stateModelFactory == null) {
LOG.warn("Fail to create msg-handler because cannot find stateModelFactory for model: "
+ stateModelId + " using factoryName: " + factoryName + " for resource: " + resourceId);
@@ -200,7 +164,7 @@ public class HelixStateMachineEngine implements StateMachineEngine {
}
// check if the state model definition exists and cache it
- if (!_stateModelDefs.containsKey(stateModelId.stringify())) {
+ if (!_stateModelDefs.containsKey(stateModelId)) {
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
StateModelDefinition stateModelDef =
@@ -209,15 +173,15 @@ public class HelixStateMachineEngine implements StateMachineEngine {
throw new HelixException("fail to create msg-handler because stateModelDef for "
+ stateModelId + " does NOT exist");
}
- _stateModelDefs.put(stateModelId.stringify(), stateModelDef);
+ _stateModelDefs.put(stateModelId, stateModelDef);
}
if (message.getBatchMessageMode() == false) {
// create currentStateDelta for this partition
- String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState();
- StateModel stateModel = stateModelFactory.getStateModel(partitionKey.stringify());
+ String initState = _stateModelDefs.get(message.getStateModelDefId()).getInitialState();
+ TransitionHandler stateModel = stateModelFactory.getTransitionHandler(partitionKey);
if (stateModel == null) {
- stateModel = stateModelFactory.createAndAddStateModel(partitionKey.stringify());
+ stateModel = stateModelFactory.createAndAddSTransitionHandler(partitionKey);
stateModel.updateState(initState);
}
@@ -237,9 +201,9 @@ public class HelixStateMachineEngine implements StateMachineEngine {
currentStateDelta);
} else {
BatchMessageWrapper wrapper =
- stateModelFactory.getBatchMessageWrapper(resourceId.stringify());
+ stateModelFactory.getBatchMessageWrapper(resourceId);
if (wrapper == null) {
- wrapper = stateModelFactory.createAndAddBatchMessageWrapper(resourceId.stringify());
+ wrapper = stateModelFactory.createAndAddBatchMessageWrapper(resourceId);
}
// get executor-service for the message
@@ -259,27 +223,15 @@ public class HelixStateMachineEngine implements StateMachineEngine {
}
@Override
- public boolean removeStateModelFactory(String stateModelDef,
- StateModelFactory<? extends StateModel> factory) {
- throw new UnsupportedOperationException("Remove not yet supported");
- }
-
- @Override
- public boolean removeStateModelFactory(String stateModelDef,
- StateModelFactory<? extends StateModel> factory, String factoryName) {
- throw new UnsupportedOperationException("Remove not yet supported");
- }
-
- @Override
public boolean registerStateModelFactory(StateModelDefId stateModelDefId,
- HelixStateModelFactory<? extends StateModel> factory) {
+ StateTransitionHandlerFactory<? extends TransitionHandler> factory) {
return registerStateModelFactory(stateModelDefId, HelixConstants.DEFAULT_STATE_MODEL_FACTORY,
factory);
}
@Override
public boolean registerStateModelFactory(StateModelDefId stateModelDefId, String factoryName,
- HelixStateModelFactory<? extends StateModel> factory) {
+ StateTransitionHandlerFactory<? extends TransitionHandler> factory) {
if (stateModelDefId == null || factoryName == null || factory == null) {
LOG.info("stateModelDefId|factoryName|stateModelFactory is null");
return false;
@@ -288,22 +240,18 @@ public class HelixStateMachineEngine implements StateMachineEngine {
LOG.info("Registering state model factory for state-model-definition: " + stateModelDefId
+ " using factory-name: " + factoryName + " with: " + factory);
- StateModelFactory<? extends StateModel> factoryAdaptor =
- new HelixStateModelFactoryAdaptor(factory);
-
- String stateModelDefName = stateModelDefId.stringify();
- if (!_stateModelFactoryMap.containsKey(stateModelDefName)) {
- _stateModelFactoryMap.put(stateModelDefName,
- new ConcurrentHashMap<String, StateModelFactory<? extends StateModel>>());
+ if (!_stateModelFactoryMap.containsKey(stateModelDefId)) {
+ _stateModelFactoryMap.put(stateModelDefId,
+ new ConcurrentHashMap<String, StateTransitionHandlerFactory<? extends TransitionHandler>>());
}
- if (_stateModelFactoryMap.get(stateModelDefName).containsKey(factoryName)) {
+ if (_stateModelFactoryMap.get(stateModelDefId).containsKey(factoryName)) {
LOG.info("Skip register state model factory for " + stateModelDefId + " using factory-name "
+ factoryName + ", since it has already been registered.");
return false;
}
- _stateModelFactoryMap.get(stateModelDefName).put(factoryName, factoryAdaptor);
+ _stateModelFactoryMap.get(stateModelDefId).put(factoryName, factory);
sendNopMessage();
return true;
@@ -324,15 +272,14 @@ public class HelixStateMachineEngine implements StateMachineEngine {
LOG.info("Removing state model factory for state-model-definition: " + stateModelDefId
+ " using factory-name: " + factoryName);
- String stateModelDefName = stateModelDefId.stringify();
- Map<String, StateModelFactory<? extends StateModel>> ftyMap =
- _stateModelFactoryMap.get(stateModelDefName);
+ Map<String, StateTransitionHandlerFactory<? extends TransitionHandler>> ftyMap =
+ _stateModelFactoryMap.get(stateModelDefId);
if (ftyMap == null) {
LOG.info("Skip remove state model factory " + stateModelDefId + ", since it does NOT exist");
return false;
}
- StateModelFactory<? extends StateModel> fty = ftyMap.remove(factoryName);
+ StateTransitionHandlerFactory<? extends TransitionHandler> fty = ftyMap.remove(factoryName);
if (fty == null) {
LOG.info("Skip remove state model factory " + stateModelDefId + " using factory-name "
+ factoryName + ", since it does NOT exist");
@@ -340,11 +287,11 @@ public class HelixStateMachineEngine implements StateMachineEngine {
}
if (ftyMap.isEmpty()) {
- _stateModelFactoryMap.remove(stateModelDefName);
+ _stateModelFactoryMap.remove(stateModelDefId);
}
- for (String partition : fty.getPartitionSet()) {
- StateModel stateModel = fty.getStateModel(partition);
+ for (PartitionId partition : fty.getPartitionSet()) {
+ TransitionHandler stateModel = fty.getTransitionHandler(partition);
stateModel.reset();
// TODO probably should remove the state from zookeeper
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
index abb7d81..48f06d3 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
@@ -19,11 +19,10 @@ package org.apache.helix.participant;
* under the License.
*/
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
/**
* Helix participant uses this class to register/remove state model factory
@@ -32,35 +31,6 @@ import org.apache.helix.participant.statemachine.StateModelFactory;
public interface StateMachineEngine extends MessageHandlerFactory {
/**
- * Replaced by {@link #registerStateModelFactory(StateModelDefId, HelixStateModelFactory)
-
- */
- @Deprecated
- public boolean registerStateModelFactory(String stateModelDef,
- StateModelFactory<? extends StateModel> factory);
-
- /**
- * Replaced by {@link #registerStateModelFactory(StateModelDefId, String, HelixStateModelFactory)}
- */
- @Deprecated
- public boolean registerStateModelFactory(String stateModelDef,
- StateModelFactory<? extends StateModel> factory, String factoryName);
-
- /**
- * Replaced by {@link #removeStateModelFactory(StateModelDefId, HelixStateModelFactory)}
- */
- @Deprecated
- public boolean removeStateModelFactory(String stateModelDef,
- StateModelFactory<? extends StateModel> factory);
-
- /**
- * Replaced by {@link #removeStateModelFactory(StateModelDefId, String, HelixStateModelFactory)}
- */
- @Deprecated
- public boolean removeStateModelFactory(String stateModelDef,
- StateModelFactory<? extends StateModel> factory, String factoryName);
-
- /**
* Register a default state model factory for a state model definition
* A state model definition could be, for example:
* "MasterSlave", "OnlineOffline", "LeaderStandby", etc.
@@ -70,7 +40,7 @@ public interface StateMachineEngine extends MessageHandlerFactory {
* @return
*/
public boolean registerStateModelFactory(StateModelDefId stateModelDefId,
- HelixStateModelFactory<? extends StateModel> factory);
+ StateTransitionHandlerFactory<? extends TransitionHandler> factory);
/**
* Register a state model factory with a factory name for a state model definition
@@ -81,7 +51,7 @@ public interface StateMachineEngine extends MessageHandlerFactory {
* @return
*/
public boolean registerStateModelFactory(StateModelDefId stateModelDefId, String factoryName,
- HelixStateModelFactory<? extends StateModel> factory);
+ StateTransitionHandlerFactory<? extends TransitionHandler> factory);
/**
* Remove the default state model factory for a state model definition
[3/5] [HELIX-484] Remove CallbackHandler/ZkCallbackHandler code
duplication,
[HELIX-486] Remove StateModelFactory/HelixStateModelFactory code duplication,
rb=24332
Posted by zz...@apache.org.
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
deleted file mode 100644
index 45f56e5..0000000
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
+++ /dev/null
@@ -1,118 +0,0 @@
-package org.apache.helix.participant.statemachine;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.messaging.handling.BatchMessageWrapper;
-
-/**
- * State model factory that uses concrete id classes instead of strings.
- * Replacing {@link org.apache.helix.participant.statemachine.StateModelFactory}
- */
-public abstract class HelixStateModelFactory<T extends StateModel> {
- /**
- * map from partitionId to stateModel
- */
- private final ConcurrentMap<PartitionId, T> _stateModelMap =
- new ConcurrentHashMap<PartitionId, T>();
-
- /**
- * map from resourceName to BatchMessageWrapper
- */
- private final ConcurrentMap<ResourceId, BatchMessageWrapper> _batchMsgWrapperMap =
- new ConcurrentHashMap<ResourceId, BatchMessageWrapper>();
-
- /**
- * This method will be invoked only once per partition per session
- * @param partitionId
- * @return
- */
- public abstract T createNewStateModel(PartitionId partitionId);
-
- /**
- * Create a state model for a partition
- * @param partitionId
- */
- public T createAndAddStateModel(PartitionId partitionId) {
- T stateModel = createNewStateModel(partitionId);
- _stateModelMap.put(partitionId, stateModel);
- return stateModel;
- }
-
- /**
- * Get the state model for a partition
- * @param partitionId
- * @return state model if exists, null otherwise
- */
- public T getStateModel(PartitionId partitionId) {
- return _stateModelMap.get(partitionId);
- }
-
- /**
- * remove state model for a partition
- * @param partitionId
- * @return state model removed or null if not exist
- */
- public T removeStateModel(PartitionId partitionId) {
- return _stateModelMap.remove(partitionId);
- }
-
- /**
- * get partition set
- * @return partitionId set
- */
- public Set<PartitionId> getPartitionSet() {
- return _stateModelMap.keySet();
- }
-
- /**
- * create a default batch-message-wrapper for a resource
- * @param resourceId
- * @return
- */
- public BatchMessageWrapper createBatchMessageWrapper(ResourceId resourceId) {
- return new BatchMessageWrapper();
- }
-
- /**
- * create a batch-message-wrapper for a resource and put it into map
- * @param resourceId
- * @return
- */
- public BatchMessageWrapper createAndAddBatchMessageWrapper(ResourceId resourceId) {
- BatchMessageWrapper wrapper = createBatchMessageWrapper(resourceId);
- _batchMsgWrapperMap.put(resourceId, wrapper);
- return wrapper;
- }
-
- /**
- * get batch-message-wrapper for a resource
- * @param resourceId
- * @return
- */
- public BatchMessageWrapper getBatchMessageWrapper(ResourceId resourceId) {
- return _batchMsgWrapperMap.get(resourceId);
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
deleted file mode 100644
index 320275d..0000000
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.apache.helix.participant.statemachine;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.api.id.PartitionId;
-
-public class HelixStateModelFactoryAdaptor<T extends StateModel> extends StateModelFactory<T> {
- final HelixStateModelFactory<T> _factory;
-
- public HelixStateModelFactoryAdaptor(HelixStateModelFactory<T> factory) {
- _factory = factory;
- }
-
- @Override
- public T createNewStateModel(String partitionName) {
- return _factory.createNewStateModel(PartitionId.from(partitionName));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
index ca67d42..954fba8 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
@@ -24,12 +24,14 @@ import java.util.Map;
import org.apache.helix.HelixException;
import org.apache.helix.NotificationContext;
import org.apache.helix.ZNRecord;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
import org.apache.helix.messaging.handling.HelixTaskExecutor;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.model.Message;
import org.apache.log4j.Logger;
-public class ScheduledTaskStateModel extends StateModel {
+public class ScheduledTaskStateModel extends TransitionHandler {
static final String DEFAULT_INITIAL_STATE = "OFFLINE";
Logger logger = Logger.getLogger(ScheduledTaskStateModel.class);
@@ -37,27 +39,27 @@ public class ScheduledTaskStateModel extends StateModel {
// StateModel with initial state other than OFFLINE should override this field
protected String _currentState = DEFAULT_INITIAL_STATE;
final ScheduledTaskStateModelFactory _factory;
- final String _partitionName;
+ final PartitionId _partition;
final HelixTaskExecutor _executor;
public ScheduledTaskStateModel(ScheduledTaskStateModelFactory factory,
- HelixTaskExecutor executor, String partitionName) {
+ HelixTaskExecutor executor, PartitionId partition) {
_factory = factory;
- _partitionName = partitionName;
+ _partition = partition;
_executor = executor;
}
@Transition(to = "COMPLETED", from = "OFFLINE")
public void onBecomeCompletedFromOffline(Message message, NotificationContext context)
throws InterruptedException {
- logger.info(_partitionName + " onBecomeCompletedFromOffline");
+ logger.info(_partition + " onBecomeCompletedFromOffline");
// System.err.println("\t\t" + _partitionName + " onBecomeCompletedFromOffline");
// Construct the inner task message from the mapfields of scheduledTaskQueue resource group
Map<String, String> messageInfo =
message.getRecord().getMapField(Message.Attributes.INNER_MESSAGE.toString());
- ZNRecord record = new ZNRecord(_partitionName);
+ ZNRecord record = new ZNRecord(_partition.stringify());
record.getSimpleFields().putAll(messageInfo);
Message taskMessage = new Message(record);
if (logger.isDebugEnabled()) {
@@ -67,49 +69,49 @@ public class ScheduledTaskStateModel extends StateModel {
_executor.createMessageHandler(taskMessage, new NotificationContext(null));
if (handler == null) {
throw new HelixException("Task message " + taskMessage.getMsgType()
- + " handler not found, task id " + _partitionName);
+ + " handler not found, task id " + _partition);
}
// Invoke the internal handler to complete the task
handler.handleMessage();
- logger.info(_partitionName + " onBecomeCompletedFromOffline completed");
+ logger.info(_partition + " onBecomeCompletedFromOffline completed");
}
@Transition(to = "OFFLINE", from = "COMPLETED")
public void onBecomeOfflineFromCompleted(Message message, NotificationContext context) {
- logger.info(_partitionName + " onBecomeOfflineFromCompleted");
+ logger.info(_partition + " onBecomeOfflineFromCompleted");
}
@Transition(to = "DROPPED", from = "COMPLETED")
public void onBecomeDroppedFromCompleted(Message message, NotificationContext context) {
- logger.info(_partitionName + " onBecomeDroppedFromCompleted");
+ logger.info(_partition + " onBecomeDroppedFromCompleted");
removeFromStatemodelFactory();
}
@Transition(to = "DROPPED", from = "OFFLINE")
public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
throws InterruptedException {
- logger.info(_partitionName + " onBecomeDroppedFromScheduled");
+ logger.info(_partition + " onBecomeDroppedFromScheduled");
removeFromStatemodelFactory();
}
@Transition(to = "OFFLINE", from = "ERROR")
public void onBecomeOfflineFromError(Message message, NotificationContext context)
throws InterruptedException {
- logger.info(_partitionName + " onBecomeOfflineFromError");
+ logger.info(_partition + " onBecomeOfflineFromError");
}
@Override
public void reset() {
- logger.info(_partitionName + " ScheduledTask reset");
+ logger.info(_partition + " ScheduledTask reset");
removeFromStatemodelFactory();
}
// We need this to prevent state model leak
private void removeFromStatemodelFactory() {
- if (_factory.getStateModel(_partitionName) != null) {
- _factory.removeStateModel(_partitionName);
+ if (_factory.getTransitionHandler(_partition) != null) {
+ _factory.removeTransitionHandler(_partition);
} else {
- logger.warn(_partitionName + " not found in ScheduledTaskStateModelFactory");
+ logger.warn(_partition + " not found in ScheduledTaskStateModelFactory");
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
index a205910..e2bc461 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
@@ -19,10 +19,12 @@ package org.apache.helix.participant.statemachine;
* under the License.
*/
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.id.PartitionId;
import org.apache.helix.messaging.handling.HelixTaskExecutor;
import org.apache.log4j.Logger;
-public class ScheduledTaskStateModelFactory extends StateModelFactory<ScheduledTaskStateModel> {
+public class ScheduledTaskStateModelFactory extends StateTransitionHandlerFactory<ScheduledTaskStateModel> {
Logger logger = Logger.getLogger(ScheduledTaskStateModelFactory.class);
HelixTaskExecutor _executor;
@@ -32,8 +34,8 @@ public class ScheduledTaskStateModelFactory extends StateModelFactory<ScheduledT
}
@Override
- public ScheduledTaskStateModel createNewStateModel(String partitionName) {
- logger.info("Create state model for ScheduledTask " + partitionName);
- return new ScheduledTaskStateModel(this, _executor, partitionName);
+ public ScheduledTaskStateModel createStateTransitionHandler(PartitionId partition) {
+ logger.info("Create state model for ScheduledTask " + partition);
+ return new ScheduledTaskStateModel(this, _executor, partition);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
deleted file mode 100644
index 9717340..0000000
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
+++ /dev/null
@@ -1,104 +0,0 @@
-package org.apache.helix.participant.statemachine;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.NotificationContext;
-import org.apache.helix.model.Message;
-import org.apache.log4j.Logger;
-
-public abstract class StateModel {
- static final String DEFAULT_INITIAL_STATE = "OFFLINE";
- Logger logger = Logger.getLogger(StateModel.class);
-
- // TODO Get default state from implementation or from state model annotation
- // StateModel with initial state other than OFFLINE should override this field
- protected String _currentState = DEFAULT_INITIAL_STATE;
-
- /**
- * requested-state is used (e.g. by task-framework) to request next state
- */
- protected String _requestedState = null;
-
- public String getCurrentState() {
- return _currentState;
- }
-
- // @transition(from='from', to='to')
- public void defaultTransitionHandler() {
- logger
- .error("Default default handler. The idea is to invoke this if no transition method is found. Yet to be implemented");
- }
-
- public boolean updateState(String newState) {
- _currentState = newState;
- return true;
- }
-
- /**
- * Get requested-state
- * @return requested-state
- */
- public String getRequestedState() {
- return _requestedState;
- }
-
- /**
- * Set requested-state
- * @param requestedState
- */
- public void setRequestedState(String requestedState) {
- _requestedState = requestedState;
- }
-
- /**
- * Called when error occurs in state transition
- * TODO:enforce subclass to write this
- * @param message
- * @param context
- * @param error
- */
- public void rollbackOnError(Message message, NotificationContext context,
- StateTransitionError error) {
-
- logger.error("Default rollback method invoked on error. Error Code: " + error.getCode());
-
- }
-
- /**
- * Called when the state model is reset
- */
- public void reset() {
- logger
- .warn("Default reset method invoked. Either because the process longer own this resource or session timedout");
- }
-
- /**
- * default transition for drop partition in error state
- * @param message
- * @param context
- * @throws InterruptedException
- */
- @Transition(to = "DROPPED", from = "ERROR")
- public void onBecomeDroppedFromError(Message message, NotificationContext context)
- throws Exception {
- logger.info("Default ERROR->DROPPED transition invoked.");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
deleted file mode 100644
index a74f67b..0000000
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package org.apache.helix.participant.statemachine;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.helix.messaging.handling.BatchMessageWrapper;
-
-/**
- * Replaced by {@link org.apache.helix.participant.statemachine.HelixStateModelFactory}
- */
-@Deprecated
-public abstract class StateModelFactory<T extends StateModel> {
- /**
- * mapping from partitionName to StateModel
- */
- private final ConcurrentMap<String, T> _stateModelMap = new ConcurrentHashMap<String, T>();
-
- /**
- * mapping from resourceName to BatchMessageWrapper
- */
- private final ConcurrentMap<String, BatchMessageWrapper> _batchMsgWrapperMap =
- new ConcurrentHashMap<String, BatchMessageWrapper>();
-
- /**
- * This method will be invoked only once per partitionName per session
- * @param partitionName
- * @return
- */
- public abstract T createNewStateModel(String partitionName);
-
- /**
- * Create a state model for a partition
- * @param partitionName
- */
- public T createAndAddStateModel(String partitionName) {
- T stateModel = createNewStateModel(partitionName);
- _stateModelMap.put(partitionName, stateModel);
- return stateModel;
- }
-
- /**
- * Get the state model for a partition
- * @param partitionName
- * @return state model if exists, null otherwise
- */
- public T getStateModel(String partitionName) {
- return _stateModelMap.get(partitionName);
- }
-
- /**
- * remove state model for a partition
- * @param partitionName
- * @return state model removed or null if not exist
- */
- public T removeStateModel(String partitionName) {
- return _stateModelMap.remove(partitionName);
- }
-
- /**
- * get partition set
- * @return partition key set
- */
- public Set<String> getPartitionSet() {
- return _stateModelMap.keySet();
- }
-
- /**
- * create a default batch-message-wrapper for a resource
- * @param resourceName
- * @return
- */
- public BatchMessageWrapper createBatchMessageWrapper(String resourceName) {
- return new BatchMessageWrapper();
- }
-
- /**
- * create a batch-message-wrapper for a resource and put it into map
- * @param resourceName
- * @return
- */
- public BatchMessageWrapper createAndAddBatchMessageWrapper(String resourceName) {
- BatchMessageWrapper wrapper = createBatchMessageWrapper(resourceName);
- _batchMsgWrapperMap.put(resourceName, wrapper);
- return wrapper;
- }
-
- /**
- * get batch-message-wrapper for a resource
- * @param resourceName
- * @return
- */
- public BatchMessageWrapper getBatchMessageWrapper(String resourceName) {
- return _batchMsgWrapperMap.get(resourceName);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelParser.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelParser.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelParser.java
index eddeaa5..d84b09a 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelParser.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelParser.java
@@ -23,6 +23,7 @@ import java.lang.reflect.Method;
import java.util.Arrays;
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.model.Message;
/**
@@ -30,7 +31,7 @@ import org.apache.helix.model.Message;
*/
public class StateModelParser {
- public Method getMethodForTransition(Class<? extends StateModel> clazz, String fromState,
+ public Method getMethodForTransition(Class<? extends TransitionHandler> clazz, String fromState,
String toState, Class<?>[] paramTypes) {
Method method = getMethodForTransitionUsingAnnotation(clazz, fromState, toState, paramTypes);
if (method == null) {
@@ -48,7 +49,7 @@ public class StateModelParser {
* @param paramTypes
* @return Method if found else null
*/
- public Method getMethodForTransitionByConvention(Class<? extends StateModel> clazz,
+ public Method getMethodForTransitionByConvention(Class<? extends TransitionHandler> clazz,
String fromState, String toState, Class<?>[] paramTypes) {
Method methodToInvoke = null;
String methodName = "onBecome" + toState + "From" + fromState;
@@ -82,7 +83,7 @@ public class StateModelParser {
* @param paramTypes
* @return
*/
- public Method getMethodForTransitionUsingAnnotation(Class<? extends StateModel> clazz,
+ public Method getMethodForTransitionUsingAnnotation(Class<? extends TransitionHandler> clazz,
String fromState, String toState, Class<?>[] paramTypes) {
StateModelInfo stateModelInfo = clazz.getAnnotation(StateModelInfo.class);
Method methodToInvoke = null;
@@ -114,12 +115,12 @@ public class StateModelParser {
* @param clazz
* @return
*/
- public String getInitialState(Class<? extends StateModel> clazz) {
+ public String getInitialState(Class<? extends TransitionHandler> clazz) {
StateModelInfo stateModelInfo = clazz.getAnnotation(StateModelInfo.class);
if (stateModelInfo != null) {
return stateModelInfo.initialState();
} else {
- return StateModel.DEFAULT_INITIAL_STATE;
+ return TransitionHandler.DEFAULT_INITIAL_STATE;
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index 66abba6..45ee71c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -20,7 +20,7 @@ package org.apache.helix.task;
*/
import org.apache.helix.HelixManager;
-import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.task.TaskResult.Status;
import org.apache.log4j.Logger;
@@ -30,7 +30,7 @@ import org.apache.log4j.Logger;
*/
public class TaskRunner implements Runnable {
private static final Logger LOG = Logger.getLogger(TaskRunner.class);
- private final StateModel _taskStateModel;
+ private final TransitionHandler _taskStateModel;
private final HelixManager _manager;
private final String _taskName;
private final String _taskPartition;
@@ -50,7 +50,7 @@ public class TaskRunner implements Runnable {
// If true, indicates that the task has finished.
private volatile boolean _done = false;
- public TaskRunner(StateModel taskStateModel, Task task, String taskName, String taskPartition,
+ public TaskRunner(TransitionHandler taskStateModel, Task task, String taskName, String taskPartition,
String instance, HelixManager manager, String sessionId) {
_taskStateModel = taskStateModel;
_task = task;
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index a44a8cb..69a1b88 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -28,14 +28,14 @@ import java.util.concurrent.ThreadFactory;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.log4j.Logger;
@StateModelInfo(states = "{'NOT USED BY HELIX'}", initialState = "INIT")
-public class TaskStateModel extends StateModel {
+public class TaskStateModel extends TransitionHandler {
private static final Logger LOG = Logger.getLogger(TaskStateModel.class);
private final HelixManager _manager;
private final ExecutorService _taskExecutor;
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
index 2537747..8fb5690 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -22,13 +22,13 @@ package org.apache.helix.task;
import java.util.Map;
import org.apache.helix.HelixManager;
+import org.apache.helix.api.StateTransitionHandlerFactory;
import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
/**
* Factory class for {@link TaskStateModel}.
*/
-public class TaskStateModelFactory extends HelixStateModelFactory<TaskStateModel> {
+public class TaskStateModelFactory extends StateTransitionHandlerFactory<TaskStateModel> {
private final HelixManager _manager;
private final Map<String, TaskFactory> _taskFactoryRegistry;
@@ -38,7 +38,7 @@ public class TaskStateModelFactory extends HelixStateModelFactory<TaskStateModel
}
@Override
- public TaskStateModel createNewStateModel(PartitionId partitionId) {
+ public TaskStateModel createStateTransitionHandler(PartitionId partitionId) {
return new TaskStateModel(_manager, _taskFactoryRegistry);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 6599b33..7402ca7 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -400,7 +400,7 @@ public class ClusterSetup {
return;
}
try {
- int replica = Integer.parseInt(idealState.getReplicas());
+ Integer.parseInt(idealState.getReplicas());
} catch (Exception e) {
_logger.error("", e);
return;
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java b/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
index f51aa1d..8e80f15 100644
--- a/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
+++ b/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
@@ -19,6 +19,7 @@ package org.apache.helix;
* under the License.
*/
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
import org.apache.helix.mock.participant.DummyProcess.DummyStateModelFactory;
@@ -43,14 +44,14 @@ public class DummyProcessThread implements Runnable {
// StateMachineEngine genericStateMachineHandler =
// new StateMachineEngine();
StateMachineEngine stateMach = _manager.getStateMachineEngine();
- stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+ stateMach.registerStateModelFactory(StateModelDefId.MasterSlave, stateModelFactory);
DummyLeaderStandbyStateModelFactory stateModelFactory1 =
new DummyLeaderStandbyStateModelFactory(10);
DummyOnlineOfflineStateModelFactory stateModelFactory2 =
new DummyOnlineOfflineStateModelFactory(10);
- stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory1);
- stateMach.registerStateModelFactory("OnlineOffline", stateModelFactory2);
+ stateMach.registerStateModelFactory(StateModelDefId.LeaderStandby, stateModelFactory1);
+ stateMach.registerStateModelFactory(StateModelDefId.OnlineOffline, stateModelFactory2);
// _manager.getMessagingService()
// .registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
// genericStateMachineHandler);
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 0303f12..e6b64fc 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -31,6 +31,7 @@ import org.I0Itec.zkclient.DataUpdater;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.messaging.AsyncCallback;
import org.apache.helix.messaging.handling.HelixTaskExecutor;
import org.apache.helix.messaging.handling.HelixTaskResult;
@@ -39,7 +40,6 @@ import org.apache.helix.messaging.handling.MessageTask;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.Message;
import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -189,7 +189,7 @@ public class Mocks {
}
- public static class MockStateModel extends StateModel {
+ public static class MockStateModel extends TransitionHandler {
boolean stateModelInvoked = false;
public void onBecomeMasterFromSlave(Message msg, NotificationContext context) {
@@ -202,7 +202,7 @@ public class Mocks {
}
@StateModelInfo(states = "{'OFFLINE','SLAVE','MASTER'}", initialState = "OFFINE")
- public static class MockStateModelAnnotated extends StateModel {
+ public static class MockStateModelAnnotated extends TransitionHandler {
boolean stateModelInvoked = false;
@Transition(from = "SLAVE", to = "MASTER")
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
index a3b16e5..d16417e 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
@@ -24,6 +24,7 @@ import org.apache.helix.Mocks.MockManager;
import org.apache.helix.Mocks.MockStateModel;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.api.State;
+import org.apache.helix.api.StateTransitionHandlerFactory;
import org.apache.helix.api.id.MessageId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
@@ -36,7 +37,6 @@ import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.tools.StateModelConfigGenerator;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
@@ -75,10 +75,10 @@ public class TestHelixTaskExecutor {
CurrentState currentStateDelta = new CurrentState("TestDB");
currentStateDelta.setState(PartitionId.from("TestDB_0"), State.from("OFFLINE"));
- StateModelFactory<MockStateModel> stateModelFactory = new StateModelFactory<MockStateModel>() {
+ StateTransitionHandlerFactory<MockStateModel> stateModelFactory = new StateTransitionHandlerFactory<MockStateModel>() {
@Override
- public MockStateModel createNewStateModel(String partitionName) {
+ public MockStateModel createStateTransitionHandler(PartitionId partition) {
// TODO Auto-generated method stub
return new MockStateModel();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java b/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
index 43b4407..21a3ae7 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
@@ -26,6 +26,7 @@ import org.apache.helix.Mocks.MockStateModel;
import org.apache.helix.Mocks.MockStateModelAnnotated;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.api.State;
+import org.apache.helix.api.StateTransitionHandlerFactory;
import org.apache.helix.api.id.MessageId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
@@ -38,7 +39,6 @@ import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.tools.StateModelConfigGenerator;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
@@ -115,11 +115,11 @@ public class TestHelixTaskHandler {
CurrentState currentStateDelta = new CurrentState("TestDB");
currentStateDelta.setState(PartitionId.from("TestDB_0"), State.from("OFFLINE"));
- StateModelFactory<MockStateModelAnnotated> stateModelFactory =
- new StateModelFactory<MockStateModelAnnotated>() {
+ StateTransitionHandlerFactory<MockStateModelAnnotated> stateModelFactory =
+ new StateTransitionHandlerFactory<MockStateModelAnnotated>() {
@Override
- public MockStateModelAnnotated createNewStateModel(String partitionName) {
+ public MockStateModelAnnotated createStateTransitionHandler(PartitionId partitionName) {
// TODO Auto-generated method stub
return new MockStateModelAnnotated();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/TestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java
index 879e727..4a9139f 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -37,8 +37,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.IDefaultNameSpace;
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkServer;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.commons.io.FileUtils;
@@ -48,7 +46,6 @@ import org.apache.helix.api.id.MessageId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.manager.zk.CallbackHandler;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
@@ -772,30 +769,4 @@ public class TestHelper {
System.out.println(sb.toString());
}
- public static void printZkListeners(ZkClient client) throws Exception {
- Map<String, Set<IZkDataListener>> datalisteners = ZkTestHelper.getZkDataListener(client);
- Map<String, Set<IZkChildListener>> childListeners = ZkTestHelper.getZkChildListener(client);
-
- System.out.println("dataListeners {");
- for (String path : datalisteners.keySet()) {
- System.out.println("\t" + path + ": ");
- Set<IZkDataListener> set = datalisteners.get(path);
- for (IZkDataListener listener : set) {
- CallbackHandler handler = (CallbackHandler) listener;
- System.out.println("\t\t" + handler.getListener());
- }
- }
- System.out.println("}");
-
- System.out.println("childListeners {");
- for (String path : childListeners.keySet()) {
- System.out.println("\t" + path + ": ");
- Set<IZkChildListener> set = childListeners.get(path);
- for (IZkChildListener listener : set) {
- CallbackHandler handler = (CallbackHandler) listener;
- System.out.println("\t\t" + handler.getListener());
- }
- }
- System.out.println("}");
- }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
index 5f37845..93d168b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.api.id.StateModelFactoryId;
import org.apache.helix.manager.zk.MockParticipant;
import org.apache.helix.manager.zk.MockController;
@@ -111,8 +112,8 @@ public class TestAddStateModelFactoryAfterConnect extends ZkTestBase {
// register "TestDB1_Factory" state model factory
// Logger.getRootLogger().setLevel(Level.INFO);
for (int i = 0; i < n; i++) {
- participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave",
- new MockMSModelFactory(), "TestDB1_Factory");
+ participants[i].getStateMachineEngine().registerStateModelFactory(StateModelDefId.MasterSlave,
+ "TestDB1_Factory", new MockMSModelFactory());
}
result =
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
index 6a6837a..789ae70 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
@@ -24,6 +24,8 @@ import java.util.Date;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.manager.zk.MockParticipant;
import org.apache.helix.manager.zk.MockController;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -57,7 +59,7 @@ public class TestBatchMessageWrapper extends ZkTestBase {
class TestMockMSModelFactory extends MockMSModelFactory {
@Override
- public BatchMessageWrapper createBatchMessageWrapper(String resourceName) {
+ public BatchMessageWrapper createBatchMessageWrapper(ResourceId resource) {
return new MockBatchMsgWrapper();
}
}
@@ -100,7 +102,7 @@ public class TestBatchMessageWrapper extends ZkTestBase {
String instanceName = "localhost_" + (12918 + i);
ftys[i] = new TestMockMSModelFactory();
participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
- participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave", ftys[i]);
+ participants[i].getStateMachineEngine().registerStateModelFactory(StateModelDefId.MasterSlave, ftys[i]);
participants[i].syncStart();
// wait for each participant to complete state transitions, so we have deterministic results
@@ -117,7 +119,7 @@ public class TestBatchMessageWrapper extends ZkTestBase {
}
// check batch-msg-wrapper counts
- MockBatchMsgWrapper wrapper = (MockBatchMsgWrapper) ftys[0].getBatchMessageWrapper("TestDB0");
+ MockBatchMsgWrapper wrapper = (MockBatchMsgWrapper) ftys[0].getBatchMessageWrapper(ResourceId.from("TestDB0"));
// System.out.println("startCount: " + wrapper._startCount);
Assert.assertEquals(wrapper._startCount, 3,
"Expect 3 batch.start: O->S, S->M, and M->S for 1st participant");
@@ -125,7 +127,7 @@ public class TestBatchMessageWrapper extends ZkTestBase {
Assert.assertEquals(wrapper._endCount, 3,
"Expect 3 batch.end: O->S, S->M, and M->S for 1st participant");
- wrapper = (MockBatchMsgWrapper) ftys[1].getBatchMessageWrapper("TestDB0");
+ wrapper = (MockBatchMsgWrapper) ftys[1].getBatchMessageWrapper(ResourceId.from("TestDB0"));
// System.out.println("startCount: " + wrapper._startCount);
Assert.assertEquals(wrapper._startCount, 2,
"Expect 2 batch.start: O->S and S->M for 2nd participant");
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
index abb2a7b..e4e844a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
@@ -27,13 +27,13 @@ import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.manager.zk.MockController;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.StateTransitionError;
import org.apache.helix.participant.statemachine.Transition;
@@ -145,7 +145,7 @@ public class TestCorrectnessOnConnectivityLoss {
@StateModelInfo(initialState = "OFFLINE", states = {
"MASTER", "SLAVE", "OFFLINE", "ERROR"
})
- public static class MyStateModel extends StateModel {
+ public static class MyStateModel extends TransitionHandler {
private final Map<String, Integer> _counts;
public MyStateModel(Map<String, Integer> counts) {
@@ -189,7 +189,7 @@ public class TestCorrectnessOnConnectivityLoss {
}
}
- public static class MyStateModelFactory extends HelixStateModelFactory<MyStateModel> {
+ public static class MyStateModelFactory extends StateTransitionHandlerFactory<MyStateModel> {
private final Map<String, Integer> _counts;
@@ -198,7 +198,7 @@ public class TestCorrectnessOnConnectivityLoss {
}
@Override
- public MyStateModel createNewStateModel(PartitionId partitionId) {
+ public MyStateModel createStateTransitionHandler(PartitionId partitionId) {
return new MyStateModel(_counts);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
deleted file mode 100644
index c2f9a5c..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package org.apache.helix.integration;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Date;
-
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.TestHelper;
-import org.apache.helix.manager.zk.MockParticipant;
-import org.apache.helix.manager.zk.MockMultiClusterController;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.testutil.ZkTestBase;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestDistributedClusterController extends ZkTestBase {
-
- @Test
- public void testDistributedClusterController() throws Exception {
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterNamePrefix = className + "_" + methodName;
- final int n = 5;
- final int clusterNb = 10;
-
- System.out
- .println("START " + clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
-
- // setup 10 clusters
- for (int i = 0; i < clusterNb; i++) {
- String clusterName = clusterNamePrefix + "0_" + i;
- String participantName = "localhost" + i;
- String resourceName = "TestDB" + i;
- TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
- participantName, // participant name prefix
- resourceName, // resource name prefix
- 1, // resources
- 8, // partitions per resource
- n, // number of nodes
- 3, // replicas
- "MasterSlave", true); // do rebalance
- }
-
- // setup controller cluster
- final String controllerClusterName = "CONTROLLER_" + clusterNamePrefix;
- TestHelper.setupCluster("CONTROLLER_" + clusterNamePrefix, _zkaddr, 0, // controller
- // port
- "controller", // participant name prefix
- clusterNamePrefix, // resource name prefix
- 1, // resources
- clusterNb, // partitions per resource
- n, // number of nodes
- 3, // replicas
- "LeaderStandby", true); // do rebalance
-
- // start distributed cluster controllers
- MockMultiClusterController[] controllers = new MockMultiClusterController[n];
- for (int i = 0; i < n; i++) {
- controllers[i] =
- new MockMultiClusterController(_zkaddr, controllerClusterName, "controller_" + i);
- controllers[i].syncStart();
- }
-
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(
- new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr, controllerClusterName),
- 30000);
- Assert.assertTrue(result, "Controller cluster NOT in ideal state");
-
- // start first cluster
- MockParticipant[] participants = new MockParticipant[n];
- final String firstClusterName = clusterNamePrefix + "0_0";
- for (int i = 0; i < n; i++) {
- String instanceName = "localhost0_" + (12918 + i);
- participants[i] = new MockParticipant(_zkaddr, firstClusterName, instanceName);
- participants[i].syncStart();
- }
-
- result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(_zkaddr,
- firstClusterName));
- Assert.assertTrue(result, "first cluster NOT in ideal state");
-
- // stop current leader in controller cluster
- ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(controllerClusterName, _baseAccessor);
- Builder keyBuilder = accessor.keyBuilder();
- LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
- String leaderName = leader.getId();
- int j = Integer.parseInt(leaderName.substring(leaderName.lastIndexOf('_') + 1));
- controllers[j].syncStop();
-
- // setup the second cluster
- MockParticipant[] participants2 = new MockParticipant[n];
- final String secondClusterName = clusterNamePrefix + "0_1";
- for (int i = 0; i < n; i++) {
- String instanceName = "localhost1_" + (12918 + i);
- participants2[i] = new MockParticipant(_zkaddr, secondClusterName, instanceName);
- participants2[i].syncStart();
- }
-
- result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(_zkaddr,
- secondClusterName));
- Assert.assertTrue(result, "second cluster NOT in ideal state");
-
- // clean up
- // wait for all zk callbacks done
- System.out.println("Cleaning up...");
- for (int i = 0; i < 5; i++) {
- result =
- ClusterStateVerifier
- .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr,
- controllerClusterName));
- controllers[i].syncStop();
- }
-
- for (int i = 0; i < 5; i++) {
- participants[i].syncStop();
- }
-
- System.out.println("END " + clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
index 19af9a7..648fc4c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
@@ -56,11 +56,8 @@ public class TestErrorPartition extends ZkTestBase {
String instanceName = "localhost_" + (12918 + i);
if (i == 0) {
- Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
- {
- put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
- }
- };
+ Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+ errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
participants[i].setTransition(new ErrTransition(errPartitions));
} else {
@@ -77,11 +74,8 @@ public class TestErrorPartition extends ZkTestBase {
_zkaddr, clusterName, errStates));
Assert.assertTrue(result);
- Map<String, Set<String>> errorStateMap = new HashMap<String, Set<String>>() {
- {
- put("TestDB0_4", TestHelper.setOf("localhost_12918"));
- }
- };
+ Map<String, Set<String>> errorStateMap = new HashMap<String, Set<String>>();
+ errorStateMap.put("TestDB0_4", TestHelper.setOf("localhost_12918"));
// verify "TestDB0_0", "localhost_12918" is in ERROR state
TestHelper.verifyState(clusterName, _zkaddr, errorStateMap, "ERROR");
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
index 3d02ae8..484ae8c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
@@ -30,6 +30,8 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.api.State;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.api.accessor.ClusterAccessor;
import org.apache.helix.api.config.ClusterConfig;
import org.apache.helix.api.config.ParticipantConfig;
@@ -46,8 +48,6 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.AutoModeISBuilder;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.helix.testutil.ZkTestBase;
@@ -61,7 +61,7 @@ public class TestHelixConnection extends ZkTestBase {
@StateModelInfo(initialState = "OFFLINE", states = {
"MASTER", "SLAVE", "OFFLINE", "ERROR"
})
- public static class MockStateModel extends StateModel {
+ public static class MockStateModel extends TransitionHandler {
public MockStateModel() {
}
@@ -74,13 +74,13 @@ public class TestHelixConnection extends ZkTestBase {
}
}
- public static class MockStateModelFactory extends HelixStateModelFactory<MockStateModel> {
+ public static class MockStateModelFactory extends StateTransitionHandlerFactory<MockStateModel> {
public MockStateModelFactory() {
}
@Override
- public MockStateModel createNewStateModel(PartitionId partitionId) {
+ public MockStateModel createStateTransitionHandler(PartitionId partitionId) {
MockStateModel model = new MockStateModel();
return model;
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
index 496a16f..95673d4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
@@ -39,7 +39,10 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.manager.zk.MockParticipant;
import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -54,8 +57,6 @@ import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.ConstraintItemBuilder;
import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.helix.testutil.ZkTestBase;
@@ -245,8 +246,6 @@ public class TestMessageThrottle2 extends ZkTestBase {
}
public void start() throws Exception {
-// helixManager =
-// new ZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, _zkaddr);
participant = new MockParticipant(_zkaddr, clusterName, instanceName);
{
// hack to set sessionTimeout
@@ -256,7 +255,7 @@ public class TestMessageThrottle2 extends ZkTestBase {
}
StateMachineEngine stateMach = participant.getStateMachineEngine();
- stateMach.registerStateModelFactory("MasterSlave", new MyStateModelFactory(participant));
+ stateMach.registerStateModelFactory(StateModelDefId.MasterSlave, new MyStateModelFactory(participant));
participant.connect();
StatusPrinter statusPrinter = new StatusPrinter();
@@ -271,13 +270,11 @@ public class TestMessageThrottle2 extends ZkTestBase {
@StateModelInfo(initialState = "OFFLINE", states = {
"MASTER", "SLAVE", "ERROR"
})
- public static class MyStateModel extends StateModel {
+ public static class MyStateModel extends TransitionHandler {
private static final Logger LOGGER = Logger.getLogger(MyStateModel.class);
- private final HelixManager helixManager;
public MyStateModel(HelixManager helixManager) {
- this.helixManager = helixManager;
}
@Transition(to = "SLAVE", from = "OFFLINE")
@@ -323,7 +320,7 @@ public class TestMessageThrottle2 extends ZkTestBase {
}
}
- static class MyStateModelFactory extends StateModelFactory<MyStateModel> {
+ static class MyStateModelFactory extends StateTransitionHandlerFactory<MyStateModel> {
private final HelixManager helixManager;
public MyStateModelFactory(HelixManager helixManager) {
@@ -331,7 +328,7 @@ public class TestMessageThrottle2 extends ZkTestBase {
}
@Override
- public MyStateModel createNewStateModel(String partitionName) {
+ public MyStateModel createStateTransitionHandler(PartitionId partitionName) {
return new MyStateModel(helixManager);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
index 08954e5..eb2bec1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
@@ -267,8 +267,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
cr.setSessionSpecific(false);
AsyncCallback asyncCallback = new MockAsyncCallback();
- int messagesSent =
- _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback, 60000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback, 60000);
AssertJUnit.assertTrue(asyncCallback.getMessageReplied().get(0).getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage")
@@ -276,7 +275,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
AssertJUnit.assertTrue(asyncCallback.getMessageReplied().size() == 1);
AsyncCallback asyncCallback2 = new MockAsyncCallback();
- messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback2, 500);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback2, 500);
AssertJUnit.assertTrue(asyncCallback2.isTimedOut());
}
@@ -287,7 +286,6 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
for (int i = 0; i < NODE_NR; i++) {
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
- String hostDest = "localhost_" + (START_PORT + i);
_participants[0].getMessagingService().registerMessageHandlerFactory(
factory.getMessageType(), factory);
@@ -307,8 +305,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
cr.setSessionSpecific(false);
AsyncCallback callback1 = new MockAsyncCallback();
- int messageSent1 =
- _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage")
@@ -316,32 +313,28 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR - 1);
AsyncCallback callback2 = new MockAsyncCallback();
- int messageSent2 = _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 500);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 500);
AssertJUnit.assertTrue(callback2.isTimedOut());
cr.setPartition("TestDB_17");
AsyncCallback callback3 = new MockAsyncCallback();
- int messageSent3 =
- _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000);
AssertJUnit.assertTrue(callback3.getMessageReplied().size() == _replica - 1);
cr.setPartition("TestDB_15");
AsyncCallback callback4 = new MockAsyncCallback();
- int messageSent4 =
- _participants[0].getMessagingService().sendAndWait(cr, msg, callback4, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback4, 10000);
AssertJUnit.assertTrue(callback4.getMessageReplied().size() == _replica);
cr.setPartitionState("SLAVE");
AsyncCallback callback5 = new MockAsyncCallback();
- int messageSent5 =
- _participants[0].getMessagingService().sendAndWait(cr, msg, callback5, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback5, 10000);
AssertJUnit.assertTrue(callback5.getMessageReplied().size() == _replica - 1);
cr.setDataSource(DataSource.IDEALSTATES);
AsyncCallback callback6 = new MockAsyncCallback();
- int messageSent6 =
- _participants[0].getMessagingService().sendAndWait(cr, msg, callback6, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback6, 10000);
AssertJUnit.assertTrue(callback6.getMessageReplied().size() == _replica - 1);
}
@@ -351,7 +344,6 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
for (int i = 0; i < NODE_NR; i++) {
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
- String hostDest = "localhost_" + (START_PORT + i);
_participants[i].getMessagingService().registerMessageHandlerFactory(
factory.getMessageType(), factory);
@@ -372,8 +364,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
cr.setSessionSpecific(false);
cr.setSelfExcluded(false);
AsyncCallback callback1 = new MockAsyncCallback();
- int messageSent1 =
- _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR);
AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
@@ -387,7 +378,6 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
for (int i = 0; i < NODE_NR; i++) {
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
- String hostDest = "localhost_" + (START_PORT + i);
_participants[i].getMessagingService().registerMessageHandlerFactory(
factory.getMessageType(), factory);
@@ -408,8 +398,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
cr.setSessionSpecific(false);
AsyncCallback callback1 = new MockAsyncCallback();
- int messagesSent =
- _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
@@ -420,7 +409,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
msg.setMessageId(msgId);
cr.setPartition("TestDB_17");
AsyncCallback callback2 = new MockAsyncCallback();
- messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 10000);
AssertJUnit.assertTrue(callback2.getMessageReplied().get(0).getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
@@ -432,7 +421,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
msg.setMessageId(msgId);
cr.setPartitionState("SLAVE");
AsyncCallback callback3 = new MockAsyncCallback();
- messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000);
AssertJUnit.assertTrue(callback3.getMessageReplied().get(0).getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
.indexOf(hostSrc) != -1);
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestMultiClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMultiClusterController.java b/helix-core/src/test/java/org/apache/helix/integration/TestMultiClusterController.java
new file mode 100644
index 0000000..8945843
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMultiClusterController.java
@@ -0,0 +1,144 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockMultiClusterController;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.testutil.ZkTestBase;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestMultiClusterController extends ZkTestBase {
+
+ @Test
+ public void test() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterNamePrefix = className + "_" + methodName;
+ final int n = 5;
+ final int clusterNb = 10;
+
+ System.out
+ .println("START " + clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
+
+ // setup 10 clusters
+ for (int i = 0; i < clusterNb; i++) {
+ String clusterName = clusterNamePrefix + "0_" + i;
+ String participantName = "localhost" + i;
+ String resourceName = "TestDB" + i;
+ TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
+ participantName, // participant name prefix
+ resourceName, // resource name prefix
+ 1, // resources
+ 8, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "MasterSlave", true); // do rebalance
+ }
+
+ // setup controller cluster
+ final String controllerClusterName = "CONTROLLER_" + clusterNamePrefix;
+ TestHelper.setupCluster("CONTROLLER_" + clusterNamePrefix, _zkaddr, 0, // controller
+ // port
+ "controller", // participant name prefix
+ clusterNamePrefix, // resource name prefix
+ 1, // resources
+ clusterNb, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "LeaderStandby", true); // do rebalance
+
+ // start distributed cluster controllers
+ MockMultiClusterController[] controllers = new MockMultiClusterController[n];
+ for (int i = 0; i < n; i++) {
+ controllers[i] =
+ new MockMultiClusterController(_zkaddr, controllerClusterName, "controller_" + i);
+ controllers[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr, controllerClusterName),
+ 30000);
+ Assert.assertTrue(result, "Controller cluster NOT in ideal state");
+
+ // start first cluster
+ MockParticipant[] participants = new MockParticipant[n];
+ final String firstClusterName = clusterNamePrefix + "0_0";
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost0_" + (12918 + i);
+ participants[i] = new MockParticipant(_zkaddr, firstClusterName, instanceName);
+ participants[i].syncStart();
+ }
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(_zkaddr,
+ firstClusterName));
+ Assert.assertTrue(result, "first cluster NOT in ideal state");
+
+ // stop current leader in controller cluster
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(controllerClusterName, _baseAccessor);
+ Builder keyBuilder = accessor.keyBuilder();
+ LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+ String leaderName = leader.getId();
+ int j = Integer.parseInt(leaderName.substring(leaderName.lastIndexOf('_') + 1));
+ controllers[j].syncStop();
+
+ // setup the second cluster
+ MockParticipant[] participants2 = new MockParticipant[n];
+ final String secondClusterName = clusterNamePrefix + "0_1";
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost1_" + (12918 + i);
+ participants2[i] = new MockParticipant(_zkaddr, secondClusterName, instanceName);
+ participants2[i].syncStart();
+ }
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(_zkaddr,
+ secondClusterName));
+ Assert.assertTrue(result, "second cluster NOT in ideal state");
+
+ // clean up
+ // wait for all zk callbacks done
+ System.out.println("Cleaning up...");
+ for (int i = 0; i < 5; i++) {
+ result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr,
+ controllerClusterName));
+ controllers[i].syncStop();
+ }
+
+ for (int i = 0; i < 5; i++) {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
index 105633a..d5f5458 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
@@ -22,6 +22,7 @@ package org.apache.helix.integration;
import java.util.Date;
import org.apache.helix.TestHelper;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.manager.zk.MockParticipant;
import org.apache.helix.manager.zk.MockController;
import org.apache.helix.mock.participant.MockBootstrapModelFactory;
@@ -65,7 +66,7 @@ public class TestNonOfflineInitState extends ZkTestBase {
// add a state model with non-OFFLINE initial state
StateMachineEngine stateMach = participants[i].getStateMachineEngine();
MockBootstrapModelFactory bootstrapFactory = new MockBootstrapModelFactory();
- stateMach.registerStateModelFactory("Bootstrap", bootstrapFactory);
+ stateMach.registerStateModelFactory(StateModelDefId.from("Bootstrap"), bootstrapFactory);
participants[i].syncStart();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
index 823a9ce..0962921 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
@@ -28,6 +28,10 @@ import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.manager.zk.MockParticipant;
import org.apache.helix.manager.zk.MockController;
import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -38,8 +42,6 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.ConstraintItemBuilder;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.testutil.ZkTestBase;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
@@ -52,7 +54,7 @@ public class TestPartitionLevelTransitionConstraint extends ZkTestBase {
final Queue<Message> _msgOrderList = new ConcurrentLinkedQueue<Message>();
- public class BootstrapStateModel extends StateModel {
+ public class BootstrapStateModel extends TransitionHandler {
public void onBecomeBootstrapFromOffline(Message message, NotificationContext context) {
LOG.info("Become Bootstrap from Offline");
_msgOrderList.add(message);
@@ -85,10 +87,10 @@ public class TestPartitionLevelTransitionConstraint extends ZkTestBase {
}
- public class BootstrapStateModelFactory extends StateModelFactory<BootstrapStateModel> {
+ public class BootstrapStateModelFactory extends StateTransitionHandlerFactory<BootstrapStateModel> {
@Override
- public BootstrapStateModel createNewStateModel(String stateUnitKey) {
+ public BootstrapStateModel createStateTransitionHandler(PartitionId partition) {
BootstrapStateModel model = new BootstrapStateModel();
return model;
}
@@ -145,7 +147,7 @@ public class TestPartitionLevelTransitionConstraint extends ZkTestBase {
String instanceName1 = "localhost_12918";
participants[0] = new MockParticipant(_zkaddr, clusterName, instanceName1);
- participants[0].getStateMachineEngine().registerStateModelFactory("Bootstrap",
+ participants[0].getStateMachineEngine().registerStateModelFactory(StateModelDefId.from("Bootstrap"),
new BootstrapStateModelFactory());
participants[0].syncStart();
@@ -158,7 +160,7 @@ public class TestPartitionLevelTransitionConstraint extends ZkTestBase {
// start 2nd participant which will be the master for Test0_0
String instanceName2 = "localhost_12919";
participants[1] = new MockParticipant(_zkaddr, clusterName, instanceName2);
- participants[1].getStateMachineEngine().registerStateModelFactory("Bootstrap",
+ participants[1].getStateMachineEngine().registerStateModelFactory(StateModelDefId.from("Bootstrap"),
new BootstrapStateModelFactory());
participants[1].syncStart();
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java b/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
index 06a2b56..7eb054d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
@@ -33,6 +33,10 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.IdealState;
@@ -41,8 +45,6 @@ import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.ConstraintItemBuilder;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.testutil.ZkTestBase;
import org.apache.log4j.Logger;
import org.testng.Assert;
@@ -152,7 +154,7 @@ public class TestPreferenceListAsQueue extends ZkTestBase {
participants[i] =
HelixManagerFactory.getZKHelixManager(_clusterName, instanceInfoArray[i],
InstanceType.PARTICIPANT, _zkaddr);
- participants[i].getStateMachineEngine().registerStateModelFactory(_stateModel,
+ participants[i].getStateMachineEngine().registerStateModelFactory(StateModelDefId.from(_stateModel),
new PrefListTaskOnlineOfflineStateModelFactory());
participants[i].connect();
}
@@ -419,7 +421,7 @@ public class TestPreferenceListAsQueue extends ZkTestBase {
* should be run, and then the instance should be removed from the preference list for the task
* (padded by spaces). All other transitions are no-ops.
*/
- public class PrefListTaskOnlineOfflineStateModel extends StateModel {
+ public class PrefListTaskOnlineOfflineStateModel extends TransitionHandler {
/**
* Run the task. The parallelism of this is dictated by the constraints that are set.
* @param message
@@ -477,9 +479,9 @@ public class TestPreferenceListAsQueue extends ZkTestBase {
}
public class PrefListTaskOnlineOfflineStateModelFactory extends
- StateModelFactory<PrefListTaskOnlineOfflineStateModel> {
+ StateTransitionHandlerFactory<PrefListTaskOnlineOfflineStateModel> {
@Override
- public PrefListTaskOnlineOfflineStateModel createNewStateModel(String partitionName) {
+ public PrefListTaskOnlineOfflineStateModel createStateTransitionHandler(PartitionId partitionName) {
return new PrefListTaskOnlineOfflineStateModel();
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
index 5804744..f3fe6f2 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
@@ -59,12 +59,9 @@ public class TestResetInstance extends ZkTestBase {
new MockController(_zkaddr, clusterName, "controller_0");
controller.syncStart();
- Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
- {
- put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
- put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
- }
- };
+ Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+ errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+ errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
// start mock participants
MockParticipant[] participants = new MockParticipant[n];
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
index 4855b3d..c7386b4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
@@ -84,12 +84,9 @@ public class TestResetPartitionState extends ZkTestBase {
new MockController(_zkaddr, clusterName, "controller_0");
controller.syncStart();
- Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
- {
- put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
- put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
- }
- };
+ Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+ errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+ errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
// start mock participants
MockParticipant[] participants = new MockParticipant[n];
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
index 7d28931..92f1c43 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
@@ -58,12 +58,9 @@ public class TestResetResource extends ZkTestBase {
new MockController(_zkaddr, clusterName, "controller_0");
controller.syncStart();
- Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
- {
- put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
- put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
- }
- };
+ Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+ errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+ errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
// start mock participants
MockParticipant[] participants = new MockParticipant[n];
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 89af602..8accc75 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -115,7 +115,6 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBase {
public HelixTaskResult handleMessage() throws InterruptedException {
HelixTaskResult result = new HelixTaskResult();
result.setSuccess(true);
- String destName = _message.getTgtName();
String partitionName = _message.getPartitionName();
result.getTaskResultMap().put("Message", _message.getMsgId());
synchronized (_results) {