You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/08/07 22:46:40 UTC
[3/5] [HELIX-484] Remove CallbackHandler/ZkCallbackHandler code
duplication,
[HELIX-486] Remove StateModelFactory/HelixStateModelFactory code duplication,
rb=24332
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
deleted file mode 100644
index 45f56e5..0000000
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
+++ /dev/null
@@ -1,118 +0,0 @@
-package org.apache.helix.participant.statemachine;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.messaging.handling.BatchMessageWrapper;
-
-/**
- * State model factory that uses concrete id classes instead of strings.
- * Replacing {@link org.apache.helix.participant.statemachine.StateModelFactory}
- */
-public abstract class HelixStateModelFactory<T extends StateModel> {
- /**
- * map from partitionId to stateModel
- */
- private final ConcurrentMap<PartitionId, T> _stateModelMap =
- new ConcurrentHashMap<PartitionId, T>();
-
- /**
- * map from resourceName to BatchMessageWrapper
- */
- private final ConcurrentMap<ResourceId, BatchMessageWrapper> _batchMsgWrapperMap =
- new ConcurrentHashMap<ResourceId, BatchMessageWrapper>();
-
- /**
- * This method will be invoked only once per partition per session
- * @param partitionId
- * @return
- */
- public abstract T createNewStateModel(PartitionId partitionId);
-
- /**
- * Create a state model for a partition
- * @param partitionId
- */
- public T createAndAddStateModel(PartitionId partitionId) {
- T stateModel = createNewStateModel(partitionId);
- _stateModelMap.put(partitionId, stateModel);
- return stateModel;
- }
-
- /**
- * Get the state model for a partition
- * @param partitionId
- * @return state model if exists, null otherwise
- */
- public T getStateModel(PartitionId partitionId) {
- return _stateModelMap.get(partitionId);
- }
-
- /**
- * remove state model for a partition
- * @param partitionId
- * @return state model removed or null if not exist
- */
- public T removeStateModel(PartitionId partitionId) {
- return _stateModelMap.remove(partitionId);
- }
-
- /**
- * get partition set
- * @return partitionId set
- */
- public Set<PartitionId> getPartitionSet() {
- return _stateModelMap.keySet();
- }
-
- /**
- * create a default batch-message-wrapper for a resource
- * @param resourceId
- * @return
- */
- public BatchMessageWrapper createBatchMessageWrapper(ResourceId resourceId) {
- return new BatchMessageWrapper();
- }
-
- /**
- * create a batch-message-wrapper for a resource and put it into map
- * @param resourceId
- * @return
- */
- public BatchMessageWrapper createAndAddBatchMessageWrapper(ResourceId resourceId) {
- BatchMessageWrapper wrapper = createBatchMessageWrapper(resourceId);
- _batchMsgWrapperMap.put(resourceId, wrapper);
- return wrapper;
- }
-
- /**
- * get batch-message-wrapper for a resource
- * @param resourceId
- * @return
- */
- public BatchMessageWrapper getBatchMessageWrapper(ResourceId resourceId) {
- return _batchMsgWrapperMap.get(resourceId);
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
deleted file mode 100644
index 320275d..0000000
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.apache.helix.participant.statemachine;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.api.id.PartitionId;
-
-public class HelixStateModelFactoryAdaptor<T extends StateModel> extends StateModelFactory<T> {
- final HelixStateModelFactory<T> _factory;
-
- public HelixStateModelFactoryAdaptor(HelixStateModelFactory<T> factory) {
- _factory = factory;
- }
-
- @Override
- public T createNewStateModel(String partitionName) {
- return _factory.createNewStateModel(PartitionId.from(partitionName));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
index ca67d42..954fba8 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
@@ -24,12 +24,14 @@ import java.util.Map;
import org.apache.helix.HelixException;
import org.apache.helix.NotificationContext;
import org.apache.helix.ZNRecord;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
import org.apache.helix.messaging.handling.HelixTaskExecutor;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.model.Message;
import org.apache.log4j.Logger;
-public class ScheduledTaskStateModel extends StateModel {
+public class ScheduledTaskStateModel extends TransitionHandler {
static final String DEFAULT_INITIAL_STATE = "OFFLINE";
Logger logger = Logger.getLogger(ScheduledTaskStateModel.class);
@@ -37,27 +39,27 @@ public class ScheduledTaskStateModel extends StateModel {
// StateModel with initial state other than OFFLINE should override this field
protected String _currentState = DEFAULT_INITIAL_STATE;
final ScheduledTaskStateModelFactory _factory;
- final String _partitionName;
+ final PartitionId _partition;
final HelixTaskExecutor _executor;
public ScheduledTaskStateModel(ScheduledTaskStateModelFactory factory,
- HelixTaskExecutor executor, String partitionName) {
+ HelixTaskExecutor executor, PartitionId partition) {
_factory = factory;
- _partitionName = partitionName;
+ _partition = partition;
_executor = executor;
}
@Transition(to = "COMPLETED", from = "OFFLINE")
public void onBecomeCompletedFromOffline(Message message, NotificationContext context)
throws InterruptedException {
- logger.info(_partitionName + " onBecomeCompletedFromOffline");
+ logger.info(_partition + " onBecomeCompletedFromOffline");
// System.err.println("\t\t" + _partitionName + " onBecomeCompletedFromOffline");
// Construct the inner task message from the mapfields of scheduledTaskQueue resource group
Map<String, String> messageInfo =
message.getRecord().getMapField(Message.Attributes.INNER_MESSAGE.toString());
- ZNRecord record = new ZNRecord(_partitionName);
+ ZNRecord record = new ZNRecord(_partition.stringify());
record.getSimpleFields().putAll(messageInfo);
Message taskMessage = new Message(record);
if (logger.isDebugEnabled()) {
@@ -67,49 +69,49 @@ public class ScheduledTaskStateModel extends StateModel {
_executor.createMessageHandler(taskMessage, new NotificationContext(null));
if (handler == null) {
throw new HelixException("Task message " + taskMessage.getMsgType()
- + " handler not found, task id " + _partitionName);
+ + " handler not found, task id " + _partition);
}
// Invoke the internal handler to complete the task
handler.handleMessage();
- logger.info(_partitionName + " onBecomeCompletedFromOffline completed");
+ logger.info(_partition + " onBecomeCompletedFromOffline completed");
}
@Transition(to = "OFFLINE", from = "COMPLETED")
public void onBecomeOfflineFromCompleted(Message message, NotificationContext context) {
- logger.info(_partitionName + " onBecomeOfflineFromCompleted");
+ logger.info(_partition + " onBecomeOfflineFromCompleted");
}
@Transition(to = "DROPPED", from = "COMPLETED")
public void onBecomeDroppedFromCompleted(Message message, NotificationContext context) {
- logger.info(_partitionName + " onBecomeDroppedFromCompleted");
+ logger.info(_partition + " onBecomeDroppedFromCompleted");
removeFromStatemodelFactory();
}
@Transition(to = "DROPPED", from = "OFFLINE")
public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
throws InterruptedException {
- logger.info(_partitionName + " onBecomeDroppedFromScheduled");
+ logger.info(_partition + " onBecomeDroppedFromScheduled");
removeFromStatemodelFactory();
}
@Transition(to = "OFFLINE", from = "ERROR")
public void onBecomeOfflineFromError(Message message, NotificationContext context)
throws InterruptedException {
- logger.info(_partitionName + " onBecomeOfflineFromError");
+ logger.info(_partition + " onBecomeOfflineFromError");
}
@Override
public void reset() {
- logger.info(_partitionName + " ScheduledTask reset");
+ logger.info(_partition + " ScheduledTask reset");
removeFromStatemodelFactory();
}
// We need this to prevent state model leak
private void removeFromStatemodelFactory() {
- if (_factory.getStateModel(_partitionName) != null) {
- _factory.removeStateModel(_partitionName);
+ if (_factory.getTransitionHandler(_partition) != null) {
+ _factory.removeTransitionHandler(_partition);
} else {
- logger.warn(_partitionName + " not found in ScheduledTaskStateModelFactory");
+ logger.warn(_partition + " not found in ScheduledTaskStateModelFactory");
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
index a205910..e2bc461 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
@@ -19,10 +19,12 @@ package org.apache.helix.participant.statemachine;
* under the License.
*/
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.id.PartitionId;
import org.apache.helix.messaging.handling.HelixTaskExecutor;
import org.apache.log4j.Logger;
-public class ScheduledTaskStateModelFactory extends StateModelFactory<ScheduledTaskStateModel> {
+public class ScheduledTaskStateModelFactory extends StateTransitionHandlerFactory<ScheduledTaskStateModel> {
Logger logger = Logger.getLogger(ScheduledTaskStateModelFactory.class);
HelixTaskExecutor _executor;
@@ -32,8 +34,8 @@ public class ScheduledTaskStateModelFactory extends StateModelFactory<ScheduledT
}
@Override
- public ScheduledTaskStateModel createNewStateModel(String partitionName) {
- logger.info("Create state model for ScheduledTask " + partitionName);
- return new ScheduledTaskStateModel(this, _executor, partitionName);
+ public ScheduledTaskStateModel createStateTransitionHandler(PartitionId partition) {
+ logger.info("Create state model for ScheduledTask " + partition);
+ return new ScheduledTaskStateModel(this, _executor, partition);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
deleted file mode 100644
index 9717340..0000000
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
+++ /dev/null
@@ -1,104 +0,0 @@
-package org.apache.helix.participant.statemachine;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.NotificationContext;
-import org.apache.helix.model.Message;
-import org.apache.log4j.Logger;
-
-public abstract class StateModel {
- static final String DEFAULT_INITIAL_STATE = "OFFLINE";
- Logger logger = Logger.getLogger(StateModel.class);
-
- // TODO Get default state from implementation or from state model annotation
- // StateModel with initial state other than OFFLINE should override this field
- protected String _currentState = DEFAULT_INITIAL_STATE;
-
- /**
- * requested-state is used (e.g. by task-framework) to request next state
- */
- protected String _requestedState = null;
-
- public String getCurrentState() {
- return _currentState;
- }
-
- // @transition(from='from', to='to')
- public void defaultTransitionHandler() {
- logger
- .error("Default default handler. The idea is to invoke this if no transition method is found. Yet to be implemented");
- }
-
- public boolean updateState(String newState) {
- _currentState = newState;
- return true;
- }
-
- /**
- * Get requested-state
- * @return requested-state
- */
- public String getRequestedState() {
- return _requestedState;
- }
-
- /**
- * Set requested-state
- * @param requestedState
- */
- public void setRequestedState(String requestedState) {
- _requestedState = requestedState;
- }
-
- /**
- * Called when error occurs in state transition
- * TODO:enforce subclass to write this
- * @param message
- * @param context
- * @param error
- */
- public void rollbackOnError(Message message, NotificationContext context,
- StateTransitionError error) {
-
- logger.error("Default rollback method invoked on error. Error Code: " + error.getCode());
-
- }
-
- /**
- * Called when the state model is reset
- */
- public void reset() {
- logger
- .warn("Default reset method invoked. Either because the process longer own this resource or session timedout");
- }
-
- /**
- * default transition for drop partition in error state
- * @param message
- * @param context
- * @throws InterruptedException
- */
- @Transition(to = "DROPPED", from = "ERROR")
- public void onBecomeDroppedFromError(Message message, NotificationContext context)
- throws Exception {
- logger.info("Default ERROR->DROPPED transition invoked.");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
deleted file mode 100644
index a74f67b..0000000
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package org.apache.helix.participant.statemachine;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.helix.messaging.handling.BatchMessageWrapper;
-
-/**
- * Replaced by {@link org.apache.helix.participant.statemachine.HelixStateModelFactory}
- */
-@Deprecated
-public abstract class StateModelFactory<T extends StateModel> {
- /**
- * mapping from partitionName to StateModel
- */
- private final ConcurrentMap<String, T> _stateModelMap = new ConcurrentHashMap<String, T>();
-
- /**
- * mapping from resourceName to BatchMessageWrapper
- */
- private final ConcurrentMap<String, BatchMessageWrapper> _batchMsgWrapperMap =
- new ConcurrentHashMap<String, BatchMessageWrapper>();
-
- /**
- * This method will be invoked only once per partitionName per session
- * @param partitionName
- * @return
- */
- public abstract T createNewStateModel(String partitionName);
-
- /**
- * Create a state model for a partition
- * @param partitionName
- */
- public T createAndAddStateModel(String partitionName) {
- T stateModel = createNewStateModel(partitionName);
- _stateModelMap.put(partitionName, stateModel);
- return stateModel;
- }
-
- /**
- * Get the state model for a partition
- * @param partitionName
- * @return state model if exists, null otherwise
- */
- public T getStateModel(String partitionName) {
- return _stateModelMap.get(partitionName);
- }
-
- /**
- * remove state model for a partition
- * @param partitionName
- * @return state model removed or null if not exist
- */
- public T removeStateModel(String partitionName) {
- return _stateModelMap.remove(partitionName);
- }
-
- /**
- * get partition set
- * @return partition key set
- */
- public Set<String> getPartitionSet() {
- return _stateModelMap.keySet();
- }
-
- /**
- * create a default batch-message-wrapper for a resource
- * @param resourceName
- * @return
- */
- public BatchMessageWrapper createBatchMessageWrapper(String resourceName) {
- return new BatchMessageWrapper();
- }
-
- /**
- * create a batch-message-wrapper for a resource and put it into map
- * @param resourceName
- * @return
- */
- public BatchMessageWrapper createAndAddBatchMessageWrapper(String resourceName) {
- BatchMessageWrapper wrapper = createBatchMessageWrapper(resourceName);
- _batchMsgWrapperMap.put(resourceName, wrapper);
- return wrapper;
- }
-
- /**
- * get batch-message-wrapper for a resource
- * @param resourceName
- * @return
- */
- public BatchMessageWrapper getBatchMessageWrapper(String resourceName) {
- return _batchMsgWrapperMap.get(resourceName);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelParser.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelParser.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelParser.java
index eddeaa5..d84b09a 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelParser.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelParser.java
@@ -23,6 +23,7 @@ import java.lang.reflect.Method;
import java.util.Arrays;
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.model.Message;
/**
@@ -30,7 +31,7 @@ import org.apache.helix.model.Message;
*/
public class StateModelParser {
- public Method getMethodForTransition(Class<? extends StateModel> clazz, String fromState,
+ public Method getMethodForTransition(Class<? extends TransitionHandler> clazz, String fromState,
String toState, Class<?>[] paramTypes) {
Method method = getMethodForTransitionUsingAnnotation(clazz, fromState, toState, paramTypes);
if (method == null) {
@@ -48,7 +49,7 @@ public class StateModelParser {
* @param paramTypes
* @return Method if found else null
*/
- public Method getMethodForTransitionByConvention(Class<? extends StateModel> clazz,
+ public Method getMethodForTransitionByConvention(Class<? extends TransitionHandler> clazz,
String fromState, String toState, Class<?>[] paramTypes) {
Method methodToInvoke = null;
String methodName = "onBecome" + toState + "From" + fromState;
@@ -82,7 +83,7 @@ public class StateModelParser {
* @param paramTypes
* @return
*/
- public Method getMethodForTransitionUsingAnnotation(Class<? extends StateModel> clazz,
+ public Method getMethodForTransitionUsingAnnotation(Class<? extends TransitionHandler> clazz,
String fromState, String toState, Class<?>[] paramTypes) {
StateModelInfo stateModelInfo = clazz.getAnnotation(StateModelInfo.class);
Method methodToInvoke = null;
@@ -114,12 +115,12 @@ public class StateModelParser {
* @param clazz
* @return
*/
- public String getInitialState(Class<? extends StateModel> clazz) {
+ public String getInitialState(Class<? extends TransitionHandler> clazz) {
StateModelInfo stateModelInfo = clazz.getAnnotation(StateModelInfo.class);
if (stateModelInfo != null) {
return stateModelInfo.initialState();
} else {
- return StateModel.DEFAULT_INITIAL_STATE;
+ return TransitionHandler.DEFAULT_INITIAL_STATE;
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index 66abba6..45ee71c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -20,7 +20,7 @@ package org.apache.helix.task;
*/
import org.apache.helix.HelixManager;
-import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.task.TaskResult.Status;
import org.apache.log4j.Logger;
@@ -30,7 +30,7 @@ import org.apache.log4j.Logger;
*/
public class TaskRunner implements Runnable {
private static final Logger LOG = Logger.getLogger(TaskRunner.class);
- private final StateModel _taskStateModel;
+ private final TransitionHandler _taskStateModel;
private final HelixManager _manager;
private final String _taskName;
private final String _taskPartition;
@@ -50,7 +50,7 @@ public class TaskRunner implements Runnable {
// If true, indicates that the task has finished.
private volatile boolean _done = false;
- public TaskRunner(StateModel taskStateModel, Task task, String taskName, String taskPartition,
+ public TaskRunner(TransitionHandler taskStateModel, Task task, String taskName, String taskPartition,
String instance, HelixManager manager, String sessionId) {
_taskStateModel = taskStateModel;
_task = task;
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index a44a8cb..69a1b88 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -28,14 +28,14 @@ import java.util.concurrent.ThreadFactory;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.log4j.Logger;
@StateModelInfo(states = "{'NOT USED BY HELIX'}", initialState = "INIT")
-public class TaskStateModel extends StateModel {
+public class TaskStateModel extends TransitionHandler {
private static final Logger LOG = Logger.getLogger(TaskStateModel.class);
private final HelixManager _manager;
private final ExecutorService _taskExecutor;
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
index 2537747..8fb5690 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -22,13 +22,13 @@ package org.apache.helix.task;
import java.util.Map;
import org.apache.helix.HelixManager;
+import org.apache.helix.api.StateTransitionHandlerFactory;
import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
/**
* Factory class for {@link TaskStateModel}.
*/
-public class TaskStateModelFactory extends HelixStateModelFactory<TaskStateModel> {
+public class TaskStateModelFactory extends StateTransitionHandlerFactory<TaskStateModel> {
private final HelixManager _manager;
private final Map<String, TaskFactory> _taskFactoryRegistry;
@@ -38,7 +38,7 @@ public class TaskStateModelFactory extends HelixStateModelFactory<TaskStateModel
}
@Override
- public TaskStateModel createNewStateModel(PartitionId partitionId) {
+ public TaskStateModel createStateTransitionHandler(PartitionId partitionId) {
return new TaskStateModel(_manager, _taskFactoryRegistry);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 6599b33..7402ca7 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -400,7 +400,7 @@ public class ClusterSetup {
return;
}
try {
- int replica = Integer.parseInt(idealState.getReplicas());
+ Integer.parseInt(idealState.getReplicas());
} catch (Exception e) {
_logger.error("", e);
return;
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java b/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
index f51aa1d..8e80f15 100644
--- a/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
+++ b/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
@@ -19,6 +19,7 @@ package org.apache.helix;
* under the License.
*/
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
import org.apache.helix.mock.participant.DummyProcess.DummyStateModelFactory;
@@ -43,14 +44,14 @@ public class DummyProcessThread implements Runnable {
// StateMachineEngine genericStateMachineHandler =
// new StateMachineEngine();
StateMachineEngine stateMach = _manager.getStateMachineEngine();
- stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+ stateMach.registerStateModelFactory(StateModelDefId.MasterSlave, stateModelFactory);
DummyLeaderStandbyStateModelFactory stateModelFactory1 =
new DummyLeaderStandbyStateModelFactory(10);
DummyOnlineOfflineStateModelFactory stateModelFactory2 =
new DummyOnlineOfflineStateModelFactory(10);
- stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory1);
- stateMach.registerStateModelFactory("OnlineOffline", stateModelFactory2);
+ stateMach.registerStateModelFactory(StateModelDefId.LeaderStandby, stateModelFactory1);
+ stateMach.registerStateModelFactory(StateModelDefId.OnlineOffline, stateModelFactory2);
// _manager.getMessagingService()
// .registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
// genericStateMachineHandler);
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 0303f12..e6b64fc 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -31,6 +31,7 @@ import org.I0Itec.zkclient.DataUpdater;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.messaging.AsyncCallback;
import org.apache.helix.messaging.handling.HelixTaskExecutor;
import org.apache.helix.messaging.handling.HelixTaskResult;
@@ -39,7 +40,6 @@ import org.apache.helix.messaging.handling.MessageTask;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.Message;
import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -189,7 +189,7 @@ public class Mocks {
}
- public static class MockStateModel extends StateModel {
+ public static class MockStateModel extends TransitionHandler {
boolean stateModelInvoked = false;
public void onBecomeMasterFromSlave(Message msg, NotificationContext context) {
@@ -202,7 +202,7 @@ public class Mocks {
}
@StateModelInfo(states = "{'OFFLINE','SLAVE','MASTER'}", initialState = "OFFINE")
- public static class MockStateModelAnnotated extends StateModel {
+ public static class MockStateModelAnnotated extends TransitionHandler {
boolean stateModelInvoked = false;
@Transition(from = "SLAVE", to = "MASTER")
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
index a3b16e5..d16417e 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
@@ -24,6 +24,7 @@ import org.apache.helix.Mocks.MockManager;
import org.apache.helix.Mocks.MockStateModel;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.api.State;
+import org.apache.helix.api.StateTransitionHandlerFactory;
import org.apache.helix.api.id.MessageId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
@@ -36,7 +37,6 @@ import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.tools.StateModelConfigGenerator;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
@@ -75,10 +75,10 @@ public class TestHelixTaskExecutor {
CurrentState currentStateDelta = new CurrentState("TestDB");
currentStateDelta.setState(PartitionId.from("TestDB_0"), State.from("OFFLINE"));
- StateModelFactory<MockStateModel> stateModelFactory = new StateModelFactory<MockStateModel>() {
+ StateTransitionHandlerFactory<MockStateModel> stateModelFactory = new StateTransitionHandlerFactory<MockStateModel>() {
@Override
- public MockStateModel createNewStateModel(String partitionName) {
+ public MockStateModel createStateTransitionHandler(PartitionId partition) {
// TODO Auto-generated method stub
return new MockStateModel();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java b/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
index 43b4407..21a3ae7 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
@@ -26,6 +26,7 @@ import org.apache.helix.Mocks.MockStateModel;
import org.apache.helix.Mocks.MockStateModelAnnotated;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.api.State;
+import org.apache.helix.api.StateTransitionHandlerFactory;
import org.apache.helix.api.id.MessageId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
@@ -38,7 +39,6 @@ import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.tools.StateModelConfigGenerator;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
@@ -115,11 +115,11 @@ public class TestHelixTaskHandler {
CurrentState currentStateDelta = new CurrentState("TestDB");
currentStateDelta.setState(PartitionId.from("TestDB_0"), State.from("OFFLINE"));
- StateModelFactory<MockStateModelAnnotated> stateModelFactory =
- new StateModelFactory<MockStateModelAnnotated>() {
+ StateTransitionHandlerFactory<MockStateModelAnnotated> stateModelFactory =
+ new StateTransitionHandlerFactory<MockStateModelAnnotated>() {
@Override
- public MockStateModelAnnotated createNewStateModel(String partitionName) {
+ public MockStateModelAnnotated createStateTransitionHandler(PartitionId partitionName) {
// TODO Auto-generated method stub
return new MockStateModelAnnotated();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/TestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java
index 879e727..4a9139f 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -37,8 +37,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.IDefaultNameSpace;
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkServer;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.commons.io.FileUtils;
@@ -48,7 +46,6 @@ import org.apache.helix.api.id.MessageId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.manager.zk.CallbackHandler;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
@@ -772,30 +769,4 @@ public class TestHelper {
System.out.println(sb.toString());
}
- public static void printZkListeners(ZkClient client) throws Exception {
- Map<String, Set<IZkDataListener>> datalisteners = ZkTestHelper.getZkDataListener(client);
- Map<String, Set<IZkChildListener>> childListeners = ZkTestHelper.getZkChildListener(client);
-
- System.out.println("dataListeners {");
- for (String path : datalisteners.keySet()) {
- System.out.println("\t" + path + ": ");
- Set<IZkDataListener> set = datalisteners.get(path);
- for (IZkDataListener listener : set) {
- CallbackHandler handler = (CallbackHandler) listener;
- System.out.println("\t\t" + handler.getListener());
- }
- }
- System.out.println("}");
-
- System.out.println("childListeners {");
- for (String path : childListeners.keySet()) {
- System.out.println("\t" + path + ": ");
- Set<IZkChildListener> set = childListeners.get(path);
- for (IZkChildListener listener : set) {
- CallbackHandler handler = (CallbackHandler) listener;
- System.out.println("\t\t" + handler.getListener());
- }
- }
- System.out.println("}");
- }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
index 5f37845..93d168b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.api.id.StateModelFactoryId;
import org.apache.helix.manager.zk.MockParticipant;
import org.apache.helix.manager.zk.MockController;
@@ -111,8 +112,8 @@ public class TestAddStateModelFactoryAfterConnect extends ZkTestBase {
// register "TestDB1_Factory" state model factory
// Logger.getRootLogger().setLevel(Level.INFO);
for (int i = 0; i < n; i++) {
- participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave",
- new MockMSModelFactory(), "TestDB1_Factory");
+ participants[i].getStateMachineEngine().registerStateModelFactory(StateModelDefId.MasterSlave,
+ "TestDB1_Factory", new MockMSModelFactory());
}
result =
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
index 6a6837a..789ae70 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
@@ -24,6 +24,8 @@ import java.util.Date;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.manager.zk.MockParticipant;
import org.apache.helix.manager.zk.MockController;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -57,7 +59,7 @@ public class TestBatchMessageWrapper extends ZkTestBase {
class TestMockMSModelFactory extends MockMSModelFactory {
@Override
- public BatchMessageWrapper createBatchMessageWrapper(String resourceName) {
+ public BatchMessageWrapper createBatchMessageWrapper(ResourceId resource) {
return new MockBatchMsgWrapper();
}
}
@@ -100,7 +102,7 @@ public class TestBatchMessageWrapper extends ZkTestBase {
String instanceName = "localhost_" + (12918 + i);
ftys[i] = new TestMockMSModelFactory();
participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
- participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave", ftys[i]);
+ participants[i].getStateMachineEngine().registerStateModelFactory(StateModelDefId.MasterSlave, ftys[i]);
participants[i].syncStart();
// wait for each participant to complete state transitions, so we have deterministic results
@@ -117,7 +119,7 @@ public class TestBatchMessageWrapper extends ZkTestBase {
}
// check batch-msg-wrapper counts
- MockBatchMsgWrapper wrapper = (MockBatchMsgWrapper) ftys[0].getBatchMessageWrapper("TestDB0");
+ MockBatchMsgWrapper wrapper = (MockBatchMsgWrapper) ftys[0].getBatchMessageWrapper(ResourceId.from("TestDB0"));
// System.out.println("startCount: " + wrapper._startCount);
Assert.assertEquals(wrapper._startCount, 3,
"Expect 3 batch.start: O->S, S->M, and M->S for 1st participant");
@@ -125,7 +127,7 @@ public class TestBatchMessageWrapper extends ZkTestBase {
Assert.assertEquals(wrapper._endCount, 3,
"Expect 3 batch.end: O->S, S->M, and M->S for 1st participant");
- wrapper = (MockBatchMsgWrapper) ftys[1].getBatchMessageWrapper("TestDB0");
+ wrapper = (MockBatchMsgWrapper) ftys[1].getBatchMessageWrapper(ResourceId.from("TestDB0"));
// System.out.println("startCount: " + wrapper._startCount);
Assert.assertEquals(wrapper._startCount, 2,
"Expect 2 batch.start: O->S and S->M for 2nd participant");
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
index abb2a7b..e4e844a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
@@ -27,13 +27,13 @@ import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.manager.zk.MockController;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.StateTransitionError;
import org.apache.helix.participant.statemachine.Transition;
@@ -145,7 +145,7 @@ public class TestCorrectnessOnConnectivityLoss {
@StateModelInfo(initialState = "OFFLINE", states = {
"MASTER", "SLAVE", "OFFLINE", "ERROR"
})
- public static class MyStateModel extends StateModel {
+ public static class MyStateModel extends TransitionHandler {
private final Map<String, Integer> _counts;
public MyStateModel(Map<String, Integer> counts) {
@@ -189,7 +189,7 @@ public class TestCorrectnessOnConnectivityLoss {
}
}
- public static class MyStateModelFactory extends HelixStateModelFactory<MyStateModel> {
+ public static class MyStateModelFactory extends StateTransitionHandlerFactory<MyStateModel> {
private final Map<String, Integer> _counts;
@@ -198,7 +198,7 @@ public class TestCorrectnessOnConnectivityLoss {
}
@Override
- public MyStateModel createNewStateModel(PartitionId partitionId) {
+ public MyStateModel createStateTransitionHandler(PartitionId partitionId) {
return new MyStateModel(_counts);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
deleted file mode 100644
index c2f9a5c..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package org.apache.helix.integration;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Date;
-
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.TestHelper;
-import org.apache.helix.manager.zk.MockParticipant;
-import org.apache.helix.manager.zk.MockMultiClusterController;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.testutil.ZkTestBase;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestDistributedClusterController extends ZkTestBase {
-
- @Test
- public void testDistributedClusterController() throws Exception {
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterNamePrefix = className + "_" + methodName;
- final int n = 5;
- final int clusterNb = 10;
-
- System.out
- .println("START " + clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
-
- // setup 10 clusters
- for (int i = 0; i < clusterNb; i++) {
- String clusterName = clusterNamePrefix + "0_" + i;
- String participantName = "localhost" + i;
- String resourceName = "TestDB" + i;
- TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
- participantName, // participant name prefix
- resourceName, // resource name prefix
- 1, // resources
- 8, // partitions per resource
- n, // number of nodes
- 3, // replicas
- "MasterSlave", true); // do rebalance
- }
-
- // setup controller cluster
- final String controllerClusterName = "CONTROLLER_" + clusterNamePrefix;
- TestHelper.setupCluster("CONTROLLER_" + clusterNamePrefix, _zkaddr, 0, // controller
- // port
- "controller", // participant name prefix
- clusterNamePrefix, // resource name prefix
- 1, // resources
- clusterNb, // partitions per resource
- n, // number of nodes
- 3, // replicas
- "LeaderStandby", true); // do rebalance
-
- // start distributed cluster controllers
- MockMultiClusterController[] controllers = new MockMultiClusterController[n];
- for (int i = 0; i < n; i++) {
- controllers[i] =
- new MockMultiClusterController(_zkaddr, controllerClusterName, "controller_" + i);
- controllers[i].syncStart();
- }
-
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(
- new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr, controllerClusterName),
- 30000);
- Assert.assertTrue(result, "Controller cluster NOT in ideal state");
-
- // start first cluster
- MockParticipant[] participants = new MockParticipant[n];
- final String firstClusterName = clusterNamePrefix + "0_0";
- for (int i = 0; i < n; i++) {
- String instanceName = "localhost0_" + (12918 + i);
- participants[i] = new MockParticipant(_zkaddr, firstClusterName, instanceName);
- participants[i].syncStart();
- }
-
- result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(_zkaddr,
- firstClusterName));
- Assert.assertTrue(result, "first cluster NOT in ideal state");
-
- // stop current leader in controller cluster
- ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(controllerClusterName, _baseAccessor);
- Builder keyBuilder = accessor.keyBuilder();
- LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
- String leaderName = leader.getId();
- int j = Integer.parseInt(leaderName.substring(leaderName.lastIndexOf('_') + 1));
- controllers[j].syncStop();
-
- // setup the second cluster
- MockParticipant[] participants2 = new MockParticipant[n];
- final String secondClusterName = clusterNamePrefix + "0_1";
- for (int i = 0; i < n; i++) {
- String instanceName = "localhost1_" + (12918 + i);
- participants2[i] = new MockParticipant(_zkaddr, secondClusterName, instanceName);
- participants2[i].syncStart();
- }
-
- result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(_zkaddr,
- secondClusterName));
- Assert.assertTrue(result, "second cluster NOT in ideal state");
-
- // clean up
- // wait for all zk callbacks done
- System.out.println("Cleaning up...");
- for (int i = 0; i < 5; i++) {
- result =
- ClusterStateVerifier
- .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr,
- controllerClusterName));
- controllers[i].syncStop();
- }
-
- for (int i = 0; i < 5; i++) {
- participants[i].syncStop();
- }
-
- System.out.println("END " + clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
index 19af9a7..648fc4c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
@@ -56,11 +56,8 @@ public class TestErrorPartition extends ZkTestBase {
String instanceName = "localhost_" + (12918 + i);
if (i == 0) {
- Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
- {
- put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
- }
- };
+ Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+ errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
participants[i].setTransition(new ErrTransition(errPartitions));
} else {
@@ -77,11 +74,8 @@ public class TestErrorPartition extends ZkTestBase {
_zkaddr, clusterName, errStates));
Assert.assertTrue(result);
- Map<String, Set<String>> errorStateMap = new HashMap<String, Set<String>>() {
- {
- put("TestDB0_4", TestHelper.setOf("localhost_12918"));
- }
- };
+ Map<String, Set<String>> errorStateMap = new HashMap<String, Set<String>>();
+ errorStateMap.put("TestDB0_4", TestHelper.setOf("localhost_12918"));
// verify "TestDB0_0", "localhost_12918" is in ERROR state
TestHelper.verifyState(clusterName, _zkaddr, errorStateMap, "ERROR");
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
index 3d02ae8..484ae8c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
@@ -30,6 +30,8 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.api.State;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.api.accessor.ClusterAccessor;
import org.apache.helix.api.config.ClusterConfig;
import org.apache.helix.api.config.ParticipantConfig;
@@ -46,8 +48,6 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.AutoModeISBuilder;
-import org.apache.helix.participant.statemachine.HelixStateModelFactory;
-import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.helix.testutil.ZkTestBase;
@@ -61,7 +61,7 @@ public class TestHelixConnection extends ZkTestBase {
@StateModelInfo(initialState = "OFFLINE", states = {
"MASTER", "SLAVE", "OFFLINE", "ERROR"
})
- public static class MockStateModel extends StateModel {
+ public static class MockStateModel extends TransitionHandler {
public MockStateModel() {
}
@@ -74,13 +74,13 @@ public class TestHelixConnection extends ZkTestBase {
}
}
- public static class MockStateModelFactory extends HelixStateModelFactory<MockStateModel> {
+ public static class MockStateModelFactory extends StateTransitionHandlerFactory<MockStateModel> {
public MockStateModelFactory() {
}
@Override
- public MockStateModel createNewStateModel(PartitionId partitionId) {
+ public MockStateModel createStateTransitionHandler(PartitionId partitionId) {
MockStateModel model = new MockStateModel();
return model;
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
index 496a16f..95673d4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
@@ -39,7 +39,10 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.manager.zk.MockParticipant;
import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -54,8 +57,6 @@ import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.ConstraintItemBuilder;
import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.helix.testutil.ZkTestBase;
@@ -245,8 +246,6 @@ public class TestMessageThrottle2 extends ZkTestBase {
}
public void start() throws Exception {
-// helixManager =
-// new ZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, _zkaddr);
participant = new MockParticipant(_zkaddr, clusterName, instanceName);
{
// hack to set sessionTimeout
@@ -256,7 +255,7 @@ public class TestMessageThrottle2 extends ZkTestBase {
}
StateMachineEngine stateMach = participant.getStateMachineEngine();
- stateMach.registerStateModelFactory("MasterSlave", new MyStateModelFactory(participant));
+ stateMach.registerStateModelFactory(StateModelDefId.MasterSlave, new MyStateModelFactory(participant));
participant.connect();
StatusPrinter statusPrinter = new StatusPrinter();
@@ -271,13 +270,11 @@ public class TestMessageThrottle2 extends ZkTestBase {
@StateModelInfo(initialState = "OFFLINE", states = {
"MASTER", "SLAVE", "ERROR"
})
- public static class MyStateModel extends StateModel {
+ public static class MyStateModel extends TransitionHandler {
private static final Logger LOGGER = Logger.getLogger(MyStateModel.class);
- private final HelixManager helixManager;
public MyStateModel(HelixManager helixManager) {
- this.helixManager = helixManager;
}
@Transition(to = "SLAVE", from = "OFFLINE")
@@ -323,7 +320,7 @@ public class TestMessageThrottle2 extends ZkTestBase {
}
}
- static class MyStateModelFactory extends StateModelFactory<MyStateModel> {
+ static class MyStateModelFactory extends StateTransitionHandlerFactory<MyStateModel> {
private final HelixManager helixManager;
public MyStateModelFactory(HelixManager helixManager) {
@@ -331,7 +328,7 @@ public class TestMessageThrottle2 extends ZkTestBase {
}
@Override
- public MyStateModel createNewStateModel(String partitionName) {
+ public MyStateModel createStateTransitionHandler(PartitionId partitionName) {
return new MyStateModel(helixManager);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
index 08954e5..eb2bec1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
@@ -267,8 +267,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
cr.setSessionSpecific(false);
AsyncCallback asyncCallback = new MockAsyncCallback();
- int messagesSent =
- _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback, 60000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback, 60000);
AssertJUnit.assertTrue(asyncCallback.getMessageReplied().get(0).getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage")
@@ -276,7 +275,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
AssertJUnit.assertTrue(asyncCallback.getMessageReplied().size() == 1);
AsyncCallback asyncCallback2 = new MockAsyncCallback();
- messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback2, 500);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback2, 500);
AssertJUnit.assertTrue(asyncCallback2.isTimedOut());
}
@@ -287,7 +286,6 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
for (int i = 0; i < NODE_NR; i++) {
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
- String hostDest = "localhost_" + (START_PORT + i);
_participants[0].getMessagingService().registerMessageHandlerFactory(
factory.getMessageType(), factory);
@@ -307,8 +305,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
cr.setSessionSpecific(false);
AsyncCallback callback1 = new MockAsyncCallback();
- int messageSent1 =
- _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage")
@@ -316,32 +313,28 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR - 1);
AsyncCallback callback2 = new MockAsyncCallback();
- int messageSent2 = _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 500);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 500);
AssertJUnit.assertTrue(callback2.isTimedOut());
cr.setPartition("TestDB_17");
AsyncCallback callback3 = new MockAsyncCallback();
- int messageSent3 =
- _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000);
AssertJUnit.assertTrue(callback3.getMessageReplied().size() == _replica - 1);
cr.setPartition("TestDB_15");
AsyncCallback callback4 = new MockAsyncCallback();
- int messageSent4 =
- _participants[0].getMessagingService().sendAndWait(cr, msg, callback4, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback4, 10000);
AssertJUnit.assertTrue(callback4.getMessageReplied().size() == _replica);
cr.setPartitionState("SLAVE");
AsyncCallback callback5 = new MockAsyncCallback();
- int messageSent5 =
- _participants[0].getMessagingService().sendAndWait(cr, msg, callback5, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback5, 10000);
AssertJUnit.assertTrue(callback5.getMessageReplied().size() == _replica - 1);
cr.setDataSource(DataSource.IDEALSTATES);
AsyncCallback callback6 = new MockAsyncCallback();
- int messageSent6 =
- _participants[0].getMessagingService().sendAndWait(cr, msg, callback6, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback6, 10000);
AssertJUnit.assertTrue(callback6.getMessageReplied().size() == _replica - 1);
}
@@ -351,7 +344,6 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
for (int i = 0; i < NODE_NR; i++) {
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
- String hostDest = "localhost_" + (START_PORT + i);
_participants[i].getMessagingService().registerMessageHandlerFactory(
factory.getMessageType(), factory);
@@ -372,8 +364,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
cr.setSessionSpecific(false);
cr.setSelfExcluded(false);
AsyncCallback callback1 = new MockAsyncCallback();
- int messageSent1 =
- _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR);
AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
@@ -387,7 +378,6 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
for (int i = 0; i < NODE_NR; i++) {
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
- String hostDest = "localhost_" + (START_PORT + i);
_participants[i].getMessagingService().registerMessageHandlerFactory(
factory.getMessageType(), factory);
@@ -408,8 +398,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
cr.setSessionSpecific(false);
AsyncCallback callback1 = new MockAsyncCallback();
- int messagesSent =
- _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
@@ -420,7 +409,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
msg.setMessageId(msgId);
cr.setPartition("TestDB_17");
AsyncCallback callback2 = new MockAsyncCallback();
- messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 10000);
AssertJUnit.assertTrue(callback2.getMessageReplied().get(0).getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
@@ -432,7 +421,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
msg.setMessageId(msgId);
cr.setPartitionState("SLAVE");
AsyncCallback callback3 = new MockAsyncCallback();
- messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000);
+ _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000);
AssertJUnit.assertTrue(callback3.getMessageReplied().get(0).getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
.indexOf(hostSrc) != -1);
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestMultiClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMultiClusterController.java b/helix-core/src/test/java/org/apache/helix/integration/TestMultiClusterController.java
new file mode 100644
index 0000000..8945843
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMultiClusterController.java
@@ -0,0 +1,144 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockMultiClusterController;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.testutil.ZkTestBase;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestMultiClusterController extends ZkTestBase {
+
+ @Test
+ public void test() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterNamePrefix = className + "_" + methodName;
+ final int n = 5;
+ final int clusterNb = 10;
+
+ System.out
+ .println("START " + clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
+
+ // setup 10 clusters
+ for (int i = 0; i < clusterNb; i++) {
+ String clusterName = clusterNamePrefix + "0_" + i;
+ String participantName = "localhost" + i;
+ String resourceName = "TestDB" + i;
+ TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
+ participantName, // participant name prefix
+ resourceName, // resource name prefix
+ 1, // resources
+ 8, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "MasterSlave", true); // do rebalance
+ }
+
+ // setup controller cluster
+ final String controllerClusterName = "CONTROLLER_" + clusterNamePrefix;
+ TestHelper.setupCluster("CONTROLLER_" + clusterNamePrefix, _zkaddr, 0, // controller
+ // port
+ "controller", // participant name prefix
+ clusterNamePrefix, // resource name prefix
+ 1, // resources
+ clusterNb, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "LeaderStandby", true); // do rebalance
+
+ // start distributed cluster controllers
+ MockMultiClusterController[] controllers = new MockMultiClusterController[n];
+ for (int i = 0; i < n; i++) {
+ controllers[i] =
+ new MockMultiClusterController(_zkaddr, controllerClusterName, "controller_" + i);
+ controllers[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr, controllerClusterName),
+ 30000);
+ Assert.assertTrue(result, "Controller cluster NOT in ideal state");
+
+ // start first cluster
+ MockParticipant[] participants = new MockParticipant[n];
+ final String firstClusterName = clusterNamePrefix + "0_0";
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost0_" + (12918 + i);
+ participants[i] = new MockParticipant(_zkaddr, firstClusterName, instanceName);
+ participants[i].syncStart();
+ }
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(_zkaddr,
+ firstClusterName));
+ Assert.assertTrue(result, "first cluster NOT in ideal state");
+
+ // stop current leader in controller cluster
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(controllerClusterName, _baseAccessor);
+ Builder keyBuilder = accessor.keyBuilder();
+ LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+ String leaderName = leader.getId();
+ int j = Integer.parseInt(leaderName.substring(leaderName.lastIndexOf('_') + 1));
+ controllers[j].syncStop();
+
+ // setup the second cluster
+ MockParticipant[] participants2 = new MockParticipant[n];
+ final String secondClusterName = clusterNamePrefix + "0_1";
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost1_" + (12918 + i);
+ participants2[i] = new MockParticipant(_zkaddr, secondClusterName, instanceName);
+ participants2[i].syncStart();
+ }
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(_zkaddr,
+ secondClusterName));
+ Assert.assertTrue(result, "second cluster NOT in ideal state");
+
+ // clean up
+ // wait for all zk callbacks done
+ System.out.println("Cleaning up...");
+ for (int i = 0; i < 5; i++) {
+ result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr,
+ controllerClusterName));
+ controllers[i].syncStop();
+ }
+
+ for (int i = 0; i < 5; i++) {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
index 105633a..d5f5458 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
@@ -22,6 +22,7 @@ package org.apache.helix.integration;
import java.util.Date;
import org.apache.helix.TestHelper;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.manager.zk.MockParticipant;
import org.apache.helix.manager.zk.MockController;
import org.apache.helix.mock.participant.MockBootstrapModelFactory;
@@ -65,7 +66,7 @@ public class TestNonOfflineInitState extends ZkTestBase {
// add a state model with non-OFFLINE initial state
StateMachineEngine stateMach = participants[i].getStateMachineEngine();
MockBootstrapModelFactory bootstrapFactory = new MockBootstrapModelFactory();
- stateMach.registerStateModelFactory("Bootstrap", bootstrapFactory);
+ stateMach.registerStateModelFactory(StateModelDefId.from("Bootstrap"), bootstrapFactory);
participants[i].syncStart();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
index 823a9ce..0962921 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java
@@ -28,6 +28,10 @@ import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.manager.zk.MockParticipant;
import org.apache.helix.manager.zk.MockController;
import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -38,8 +42,6 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.ConstraintItemBuilder;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.testutil.ZkTestBase;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
@@ -52,7 +54,7 @@ public class TestPartitionLevelTransitionConstraint extends ZkTestBase {
final Queue<Message> _msgOrderList = new ConcurrentLinkedQueue<Message>();
- public class BootstrapStateModel extends StateModel {
+ public class BootstrapStateModel extends TransitionHandler {
public void onBecomeBootstrapFromOffline(Message message, NotificationContext context) {
LOG.info("Become Bootstrap from Offline");
_msgOrderList.add(message);
@@ -85,10 +87,10 @@ public class TestPartitionLevelTransitionConstraint extends ZkTestBase {
}
- public class BootstrapStateModelFactory extends StateModelFactory<BootstrapStateModel> {
+ public class BootstrapStateModelFactory extends StateTransitionHandlerFactory<BootstrapStateModel> {
@Override
- public BootstrapStateModel createNewStateModel(String stateUnitKey) {
+ public BootstrapStateModel createStateTransitionHandler(PartitionId partition) {
BootstrapStateModel model = new BootstrapStateModel();
return model;
}
@@ -145,7 +147,7 @@ public class TestPartitionLevelTransitionConstraint extends ZkTestBase {
String instanceName1 = "localhost_12918";
participants[0] = new MockParticipant(_zkaddr, clusterName, instanceName1);
- participants[0].getStateMachineEngine().registerStateModelFactory("Bootstrap",
+ participants[0].getStateMachineEngine().registerStateModelFactory(StateModelDefId.from("Bootstrap"),
new BootstrapStateModelFactory());
participants[0].syncStart();
@@ -158,7 +160,7 @@ public class TestPartitionLevelTransitionConstraint extends ZkTestBase {
// start 2nd participant which will be the master for Test0_0
String instanceName2 = "localhost_12919";
participants[1] = new MockParticipant(_zkaddr, clusterName, instanceName2);
- participants[1].getStateMachineEngine().registerStateModelFactory("Bootstrap",
+ participants[1].getStateMachineEngine().registerStateModelFactory(StateModelDefId.from("Bootstrap"),
new BootstrapStateModelFactory());
participants[1].syncStart();
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java b/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
index 06a2b56..7eb054d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
@@ -33,6 +33,10 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.IdealState;
@@ -41,8 +45,6 @@ import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.ConstraintItemBuilder;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.testutil.ZkTestBase;
import org.apache.log4j.Logger;
import org.testng.Assert;
@@ -152,7 +154,7 @@ public class TestPreferenceListAsQueue extends ZkTestBase {
participants[i] =
HelixManagerFactory.getZKHelixManager(_clusterName, instanceInfoArray[i],
InstanceType.PARTICIPANT, _zkaddr);
- participants[i].getStateMachineEngine().registerStateModelFactory(_stateModel,
+ participants[i].getStateMachineEngine().registerStateModelFactory(StateModelDefId.from(_stateModel),
new PrefListTaskOnlineOfflineStateModelFactory());
participants[i].connect();
}
@@ -419,7 +421,7 @@ public class TestPreferenceListAsQueue extends ZkTestBase {
* should be run, and then the instance should be removed from the preference list for the task
* (padded by spaces). All other transitions are no-ops.
*/
- public class PrefListTaskOnlineOfflineStateModel extends StateModel {
+ public class PrefListTaskOnlineOfflineStateModel extends TransitionHandler {
/**
* Run the task. The parallelism of this is dictated by the constraints that are set.
* @param message
@@ -477,9 +479,9 @@ public class TestPreferenceListAsQueue extends ZkTestBase {
}
public class PrefListTaskOnlineOfflineStateModelFactory extends
- StateModelFactory<PrefListTaskOnlineOfflineStateModel> {
+ StateTransitionHandlerFactory<PrefListTaskOnlineOfflineStateModel> {
@Override
- public PrefListTaskOnlineOfflineStateModel createNewStateModel(String partitionName) {
+ public PrefListTaskOnlineOfflineStateModel createStateTransitionHandler(PartitionId partitionName) {
return new PrefListTaskOnlineOfflineStateModel();
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
index 5804744..f3fe6f2 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
@@ -59,12 +59,9 @@ public class TestResetInstance extends ZkTestBase {
new MockController(_zkaddr, clusterName, "controller_0");
controller.syncStart();
- Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
- {
- put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
- put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
- }
- };
+ Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+ errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+ errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
// start mock participants
MockParticipant[] participants = new MockParticipant[n];
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
index 4855b3d..c7386b4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
@@ -84,12 +84,9 @@ public class TestResetPartitionState extends ZkTestBase {
new MockController(_zkaddr, clusterName, "controller_0");
controller.syncStart();
- Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
- {
- put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
- put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
- }
- };
+ Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+ errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+ errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
// start mock participants
MockParticipant[] participants = new MockParticipant[n];
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
index 7d28931..92f1c43 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
@@ -58,12 +58,9 @@ public class TestResetResource extends ZkTestBase {
new MockController(_zkaddr, clusterName, "controller_0");
controller.syncStart();
- Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
- {
- put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
- put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
- }
- };
+ Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
+ errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+ errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
// start mock participants
MockParticipant[] participants = new MockParticipant[n];
http://git-wip-us.apache.org/repos/asf/helix/blob/4ae1ff79/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 89af602..8accc75 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -115,7 +115,6 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBase {
public HelixTaskResult handleMessage() throws InterruptedException {
HelixTaskResult result = new HelixTaskResult();
result.setSuccess(true);
- String destName = _message.getTgtName();
String partitionName = _message.getPartitionName();
result.getTaskResultMap().put("Message", _message.getMsgId());
synchronized (_results) {