You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:57 UTC
[14/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
new file mode 100644
index 0000000..5b962f8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -0,0 +1,1098 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import static org.apache.helix.HelixConstants.ChangeType.CONFIG;
+import static org.apache.helix.HelixConstants.ChangeType.CURRENT_STATE;
+import static org.apache.helix.HelixConstants.ChangeType.EXTERNAL_VIEW;
+import static org.apache.helix.HelixConstants.ChangeType.HEALTH;
+import static org.apache.helix.HelixConstants.ChangeType.IDEAL_STATE;
+import static org.apache.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
+import static org.apache.helix.HelixConstants.ChangeType.MESSAGE;
+import static org.apache.helix.HelixConstants.ChangeType.MESSAGES_CONTROLLER;
+
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.Timer;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.DataAccessor;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HealthStateChangeListener;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixTimerTask;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.PreConnectCallback;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
+import org.apache.helix.healthcheck.HealthStatsAggregationTask;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.monitoring.ZKPathDataDumpTask;
+import org.apache.helix.participant.DistClusterControllerElection;
+import org.apache.helix.participant.HelixStateMachineEngine;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.store.PropertyStore;
+import org.apache.helix.store.ZNRecordJsonSerializer;
+import org.apache.helix.store.zk.ZKPropertyStore;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.tools.PropertiesReader;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+
+public class ZKHelixManager implements HelixManager
+{
+ private static Logger logger =
+ Logger.getLogger(ZKHelixManager.class);
+ private static final int RETRY_LIMIT = 3;
+ private static final int CONNECTIONTIMEOUT = 60 * 1000;
+ private final String _clusterName;
+ private final String _instanceName;
+ private final String _zkConnectString;
+ private static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
+ private ZKDataAccessor _accessor;
+ private ZKHelixDataAccessor _helixAccessor;
+ private ConfigAccessor _configAccessor;
+ protected ZkClient _zkClient;
+ private final List<CallbackHandler> _handlers;
+ private final ZkStateChangeListener _zkStateChangeListener;
+ private final InstanceType _instanceType;
+ volatile String _sessionId;
+ private Timer _timer;
+ private CallbackHandler _leaderElectionHandler;
+ private ParticipantHealthReportCollectorImpl _participantHealthCheckInfoCollector;
+ private final DefaultMessagingService _messagingService;
+ private ZKHelixAdmin _managementTool;
+ private final String _version;
+ private final StateMachineEngine _stateMachEngine;
+ private int _sessionTimeout;
+ private PropertyStore<ZNRecord> _propertyStore;
+ private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
+ private final List<HelixTimerTask> _controllerTimerTasks;
+ private BaseDataAccessor<ZNRecord> _baseDataAccessor;
+ List<PreConnectCallback> _preConnectCallbacks =
+ new LinkedList<PreConnectCallback>();
+ ZKPropertyTransferServer _transferServer = null;
+
+ public ZKHelixManager(String clusterName,
+ String instanceName,
+ InstanceType instanceType,
+ String zkConnectString) throws Exception
+ {
+ logger.info("Create a zk-based cluster manager. clusterName:" + clusterName
+ + ", instanceName:" + instanceName + ", type:" + instanceType + ", zkSvr:"
+ + zkConnectString);
+ int sessionTimeoutInt = -1;
+ try
+ {
+ sessionTimeoutInt =
+ Integer.parseInt(System.getProperty("zk.session.timeout", ""
+ + DEFAULT_SESSION_TIMEOUT));
+ }
+ catch (NumberFormatException e)
+ {
+ logger.warn("Exception while parsing session timeout: "
+ + System.getProperty("zk.session.timeout", "" + DEFAULT_SESSION_TIMEOUT));
+ }
+ if (sessionTimeoutInt > 0)
+ {
+ _sessionTimeout = sessionTimeoutInt;
+ }
+ else
+ {
+ _sessionTimeout = DEFAULT_SESSION_TIMEOUT;
+ }
+ if (instanceName == null)
+ {
+ try
+ {
+ instanceName =
+ InetAddress.getLocalHost().getCanonicalHostName() + "-"
+ + instanceType.toString();
+ }
+ catch (UnknownHostException e)
+ {
+ // can ignore it
+ logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable",
+ e);
+ instanceName = "UNKNOWN";
+ }
+ }
+
+ _clusterName = clusterName;
+ _instanceName = instanceName;
+ _instanceType = instanceType;
+ _zkConnectString = zkConnectString;
+ _zkStateChangeListener = new ZkStateChangeListener(this);
+ _timer = null;
+
+ _handlers = new ArrayList<CallbackHandler>();
+
+ _messagingService = new DefaultMessagingService(this);
+
+ _version =
+ new PropertiesReader("cluster-manager-version.properties").getProperty("clustermanager.version");
+
+ _stateMachEngine = new HelixStateMachineEngine(this);
+
+ // add all timer tasks
+ _controllerTimerTasks = new ArrayList<HelixTimerTask>();
+ if (_instanceType == InstanceType.CONTROLLER)
+ {
+ _controllerTimerTasks.add(new HealthStatsAggregationTask(this));
+ }
+ }
+
+ private boolean isInstanceSetup()
+ {
+ if (_instanceType == InstanceType.PARTICIPANT
+ || _instanceType == InstanceType.CONTROLLER_PARTICIPANT)
+ {
+ boolean isValid =
+ _zkClient.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS,
+ _clusterName,
+ ConfigScopeProperty.PARTICIPANT.toString(),
+ _instanceName))
+ && _zkClient.exists(PropertyPathConfig.getPath(PropertyType.MESSAGES,
+ _clusterName,
+ _instanceName))
+ && _zkClient.exists(PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ _clusterName,
+ _instanceName))
+ && _zkClient.exists(PropertyPathConfig.getPath(PropertyType.STATUSUPDATES,
+ _clusterName,
+ _instanceName))
+ && _zkClient.exists(PropertyPathConfig.getPath(PropertyType.ERRORS,
+ _clusterName,
+ _instanceName));
+
+ return isValid;
+ }
+ return true;
+ }
+
+ @Override
+ public void addIdealStateChangeListener(final IdealStateChangeListener listener) throws Exception
+ {
+ logger.info("ClusterManager.addIdealStateChangeListener()");
+ checkConnected();
+ final String path =
+ PropertyPathConfig.getPath(PropertyType.IDEALSTATES, _clusterName);
+ CallbackHandler callbackHandler =
+ createCallBackHandler(path,
+ listener,
+ new EventType[] { EventType.NodeDataChanged,
+ EventType.NodeDeleted, EventType.NodeCreated },
+ IDEAL_STATE);
+ addListener(callbackHandler);
+ }
+
+ @Override
+ public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener) throws Exception
+ {
+ logger.info("ClusterManager.addLiveInstanceChangeListener()");
+ checkConnected();
+ final String path = _helixAccessor.keyBuilder().liveInstances().getPath();
+ CallbackHandler callbackHandler =
+ createCallBackHandler(path,
+ listener,
+ new EventType[] { EventType.NodeDataChanged, EventType.NodeChildrenChanged,
+ EventType.NodeDeleted, EventType.NodeCreated },
+ LIVE_INSTANCE);
+ addListener(callbackHandler);
+ }
+
+ @Override
+ public void addConfigChangeListener(ConfigChangeListener listener)
+ {
+ logger.info("ClusterManager.addConfigChangeListener()");
+ checkConnected();
+ final String path =
+ PropertyPathConfig.getPath(PropertyType.CONFIGS,
+ _clusterName,
+ ConfigScopeProperty.PARTICIPANT.toString());
+
+ CallbackHandler callbackHandler =
+ createCallBackHandler(path,
+ listener,
+ new EventType[] { EventType.NodeChildrenChanged },
+ CONFIG);
+ addListener(callbackHandler);
+
+ }
+
+ // TODO: Decide if do we still need this since we are exposing
+ // ClusterMessagingService
+ @Override
+ public void addMessageListener(MessageListener listener, String instanceName)
+ {
+ logger.info("ClusterManager.addMessageListener() " + instanceName);
+ checkConnected();
+ final String path = _helixAccessor.keyBuilder().messages(instanceName).getPath();
+ CallbackHandler callbackHandler =
+ createCallBackHandler(path,
+ listener,
+ new EventType[] { EventType.NodeChildrenChanged,
+ EventType.NodeDeleted, EventType.NodeCreated },
+ MESSAGE);
+ addListener(callbackHandler);
+ }
+
+ void addControllerMessageListener(MessageListener listener)
+ {
+ logger.info("ClusterManager.addControllerMessageListener()");
+ checkConnected();
+ final String path = _helixAccessor.keyBuilder().controllerMessages().getPath();
+
+ CallbackHandler callbackHandler =
+ createCallBackHandler(path,
+ listener,
+ new EventType[] { EventType.NodeChildrenChanged,
+ EventType.NodeDeleted, EventType.NodeCreated },
+ MESSAGES_CONTROLLER);
+ addListener(callbackHandler);
+ }
+
+ @Override
+ public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
+ String instanceName,
+ String sessionId)
+ {
+ logger.info("ClusterManager.addCurrentStateChangeListener() " + instanceName + " "
+ + sessionId);
+ checkConnected();
+ final String path =
+ _helixAccessor.keyBuilder().currentStates(instanceName, sessionId).getPath();
+
+ CallbackHandler callbackHandler =
+ createCallBackHandler(path,
+ listener,
+ new EventType[] { EventType.NodeChildrenChanged,
+ EventType.NodeDeleted, EventType.NodeCreated },
+ CURRENT_STATE);
+ addListener(callbackHandler);
+ }
+
+ @Override
+ public void addHealthStateChangeListener(HealthStateChangeListener listener,
+ String instanceName)
+ {
+ // System.out.println("ZKClusterManager.addHealthStateChangeListener()");
+ // TODO: re-form this for stats checking
+ logger.info("ClusterManager.addHealthStateChangeListener()" + instanceName);
+ checkConnected();
+ final String path = _helixAccessor.keyBuilder().healthReports(instanceName).getPath();
+
+ CallbackHandler callbackHandler =
+ createCallBackHandler(path, listener, new EventType[] {
+ EventType.NodeChildrenChanged, EventType.NodeDataChanged,
+ EventType.NodeDeleted, EventType.NodeCreated }, HEALTH);
+ addListener(callbackHandler);
+ }
+
+ @Override
+ public void addExternalViewChangeListener(ExternalViewChangeListener listener)
+ {
+ logger.info("ClusterManager.addExternalViewChangeListener()");
+ checkConnected();
+ final String path = _helixAccessor.keyBuilder().externalViews().getPath();
+
+ CallbackHandler callbackHandler =
+ createCallBackHandler(path,
+ listener,
+ new EventType[] { EventType.NodeDataChanged,
+ EventType.NodeDeleted, EventType.NodeCreated },
+ EXTERNAL_VIEW);
+ addListener(callbackHandler);
+ }
+
+ @Override
+ public DataAccessor getDataAccessor()
+ {
+ checkConnected();
+ return _accessor;
+ }
+
+ @Override
+ public HelixDataAccessor getHelixDataAccessor()
+ {
+ checkConnected();
+ return _helixAccessor;
+ }
+
+ @Override
+ public ConfigAccessor getConfigAccessor()
+ {
+ checkConnected();
+ return _configAccessor;
+ }
+
+ @Override
+ public String getClusterName()
+ {
+ return _clusterName;
+ }
+
+ @Override
+ public String getInstanceName()
+ {
+ return _instanceName;
+ }
+
+ @Override
+ public void connect() throws Exception
+ {
+ logger.info("ClusterManager.connect()");
+ if (_zkStateChangeListener.isConnected())
+ {
+ logger.warn("Cluster manager " + _clusterName + " " + _instanceName
+ + " already connected");
+ return;
+ }
+
+ try
+ {
+ createClient(_zkConnectString);
+ _messagingService.onConnected();
+ }
+ catch (Exception e)
+ {
+ logger.error(e);
+ disconnect();
+ throw e;
+ }
+ }
+
+ @Override
+ public void disconnect()
+ {
+
+ if (!isConnected())
+ {
+ logger.warn("ClusterManager " + _instanceName + " already disconnected");
+ return;
+ }
+
+ logger.info("disconnect " + _instanceName + "(" + _instanceType + ") from "
+ + _clusterName);
+
+ /**
+ * shutdown thread pool first to avoid reset() being invoked in the middle of state
+ * transition
+ */
+ _messagingService.getExecutor().shutDown();
+ resetHandlers();
+
+ _helixAccessor.shutdown();
+
+ if (_leaderElectionHandler != null)
+ {
+ _leaderElectionHandler.reset();
+ }
+
+ if (_participantHealthCheckInfoCollector != null)
+ {
+ _participantHealthCheckInfoCollector.stop();
+ }
+
+ if (_timer != null)
+ {
+ _timer.cancel();
+ _timer = null;
+ }
+
+ if (_instanceType == InstanceType.CONTROLLER)
+ {
+ stopTimerTasks();
+ }
+
+ if (_propertyStore != null)
+ {
+ _propertyStore.stop();
+ }
+
+ // unsubscribe accessor from controllerChange
+ _zkClient.unsubscribeAll();
+
+ _zkClient.close();
+
+ // HACK seems that zkClient is not sending DISCONNECT event
+ _zkStateChangeListener.disconnect();
+ logger.info("Cluster manager: " + _instanceName + " disconnected");
+ }
+
+ @Override
+ public String getSessionId()
+ {
+ checkConnected();
+ return _sessionId;
+ }
+
+ @Override
+ public boolean isConnected()
+ {
+ return _zkStateChangeListener.isConnected();
+ }
+
+ @Override
+ public long getLastNotificationTime()
+ {
+ return -1;
+ }
+
+ @Override
+ public void addControllerListener(ControllerChangeListener listener)
+ {
+ checkConnected();
+ final String path = _helixAccessor.keyBuilder().controller().getPath();
+ logger.info("Add controller listener at: " + path);
+ CallbackHandler callbackHandler =
+ createCallBackHandler(path,
+ listener,
+ new EventType[] { EventType.NodeChildrenChanged,
+ EventType.NodeDeleted, EventType.NodeCreated },
+ ChangeType.CONTROLLER);
+
+ // System.out.println("add controller listeners to " + _instanceName +
+ // " for " + _clusterName);
+ // _handlers.add(callbackHandler);
+ addListener(callbackHandler);
+ }
+
+ @Override
+ public boolean removeListener(Object listener)
+ {
+ logger.info("remove listener: " + listener + " from " + _instanceName);
+
+ synchronized (this)
+ {
+ Iterator<CallbackHandler> iterator = _handlers.iterator();
+ while (iterator.hasNext())
+ {
+ CallbackHandler handler = iterator.next();
+ // simply compare reference
+ if (handler.getListener().equals(listener))
+ {
+ handler.reset();
+ iterator.remove();
+ }
+ }
+ }
+
+ return true;
+ }
+
+ private void addLiveInstance()
+ {
+ LiveInstance liveInstance = new LiveInstance(_instanceName);
+ liveInstance.setSessionId(_sessionId);
+ liveInstance.setHelixVersion(_version);
+ liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
+
+ logger.info("Add live instance: InstanceName: " + _instanceName + " Session id:"
+ + _sessionId);
+ Builder keyBuilder = _helixAccessor.keyBuilder();
+ if (!_helixAccessor.createProperty(keyBuilder.liveInstance(_instanceName),
+ liveInstance))
+ {
+ String errorMsg =
+ "Fail to create live instance node after waiting, so quit. instance:"
+ + _instanceName;
+ logger.warn(errorMsg);
+ throw new HelixException(errorMsg);
+
+ }
+ String currentStatePathParent =
+ PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ _clusterName,
+ _instanceName,
+ getSessionId());
+
+ if (!_zkClient.exists(currentStatePathParent))
+ {
+ _zkClient.createPersistent(currentStatePathParent);
+ logger.info("Creating current state path " + currentStatePathParent);
+ }
+ }
+
+ private void startStatusUpdatedumpTask()
+ {
+ long initialDelay = 30 * 60 * 1000;
+ long period = 120 * 60 * 1000;
+ int timeThresholdNoChange = 180 * 60 * 1000;
+
+ if (_timer == null)
+ {
+ _timer = new Timer(true);
+ _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(this,
+ _zkClient,
+ timeThresholdNoChange),
+ initialDelay,
+ period);
+ }
+ }
+
+ private void createClient(String zkServers) throws Exception
+ {
+ String propertyStorePath =
+ PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
+
+ // by default use ZNRecordStreamingSerializer except for paths within the property
+ // store which expects raw byte[] serialization/deserialization
+ PathBasedZkSerializer zkSerializer =
+ ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer())
+ .serialize(propertyStorePath, new ByteArraySerializer())
+ .build();
+
+ _zkClient = new ZkClient(zkServers, _sessionTimeout, CONNECTIONTIMEOUT, zkSerializer);
+ _accessor = new ZKDataAccessor(_clusterName, _zkClient);
+
+ ZkBaseDataAccessor<ZNRecord> baseDataAccessor =
+ new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ if (_instanceType == InstanceType.PARTICIPANT)
+ {
+ String curStatePath =
+ PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ _clusterName,
+ _instanceName);
+ _baseDataAccessor =
+ new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor,
+ Arrays.asList(curStatePath));
+ }
+ else if (_instanceType == InstanceType.CONTROLLER)
+ {
+ String extViewPath = PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW,
+
+ _clusterName);
+ _baseDataAccessor =
+ new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor,
+ Arrays.asList(extViewPath));
+
+ }
+ else
+ {
+ _baseDataAccessor = baseDataAccessor;
+ }
+
+ _helixAccessor =
+ new ZKHelixDataAccessor(_clusterName, _instanceType, _baseDataAccessor);
+ _configAccessor = new ConfigAccessor(_zkClient);
+ int retryCount = 0;
+
+ _zkClient.subscribeStateChanges(_zkStateChangeListener);
+ while (retryCount < RETRY_LIMIT)
+ {
+ try
+ {
+ _zkClient.waitUntilConnected(_sessionTimeout, TimeUnit.MILLISECONDS);
+ _zkStateChangeListener.handleStateChanged(KeeperState.SyncConnected);
+ _zkStateChangeListener.handleNewSession();
+ break;
+ }
+ catch (HelixException e)
+ {
+ logger.error("fail to createClient.", e);
+ throw e;
+ }
+ catch (Exception e)
+ {
+ retryCount++;
+
+ logger.error("fail to createClient. retry " + retryCount, e);
+ if (retryCount == RETRY_LIMIT)
+ {
+ throw e;
+ }
+ }
+ }
+ }
+
+ private CallbackHandler createCallBackHandler(String path,
+ Object listener,
+ EventType[] eventTypes,
+ ChangeType changeType)
+ {
+ if (listener == null)
+ {
+ throw new HelixException("Listener cannot be null");
+ }
+ return new CallbackHandler(this, _zkClient, path, listener, eventTypes, changeType);
+ }
+
+ /**
+ * This will be invoked when ever a new session is created<br/>
+ *
+ * case 1: the cluster manager was a participant carry over current state, add live
+ * instance, and invoke message listener; case 2: the cluster manager was controller and
+ * was a leader before do leader election, and if it becomes leader again, invoke ideal
+ * state listener, current state listener, etc. if it fails to become leader in the new
+ * session, then becomes standby; case 3: the cluster manager was controller and was NOT
+ * a leader before do leader election, and if it becomes leader, instantiate and invoke
+ * ideal state listener, current state listener, etc. if if fails to become leader in
+ * the new session, stay as standby
+ */
+
+ protected void handleNewSession()
+ {
+ boolean isConnected = _zkClient.waitUntilConnected(CONNECTIONTIMEOUT, TimeUnit.MILLISECONDS);
+ while (!isConnected)
+ {
+ logger.error("Could NOT connect to zk server in " + CONNECTIONTIMEOUT + "ms. zkServer: "
+ + _zkConnectString + ", expiredSessionId: " + _sessionId + ", clusterName: "
+ + _clusterName);
+ isConnected = _zkClient.waitUntilConnected(CONNECTIONTIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
+ ZkConnection zkConnection = ((ZkConnection) _zkClient.getConnection());
+
+ synchronized (this)
+ {
+ _sessionId = Long.toHexString(zkConnection.getZookeeper().getSessionId());
+ }
+ _accessor.reset();
+ _baseDataAccessor.reset();
+
+ resetHandlers();
+
+ logger.info("Handling new session, session id:" + _sessionId + ", instance:"
+ + _instanceName + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName);
+
+ logger.info(zkConnection.getZookeeper());
+
+ if (!ZKUtil.isClusterSetup(_clusterName, _zkClient))
+ {
+ throw new HelixException("Initial cluster structure is not set up for cluster:"
+ + _clusterName);
+ }
+ if (!isInstanceSetup())
+ {
+ throw new HelixException("Initial cluster structure is not set up for instance:"
+ + _instanceName + " instanceType:" + _instanceType);
+ }
+
+ if (_instanceType == InstanceType.PARTICIPANT
+ || _instanceType == InstanceType.CONTROLLER_PARTICIPANT)
+ {
+ handleNewSessionAsParticipant();
+ }
+
+ if (_instanceType == InstanceType.CONTROLLER
+ || _instanceType == InstanceType.CONTROLLER_PARTICIPANT)
+ {
+ addControllerMessageListener(_messagingService.getExecutor());
+ MessageHandlerFactory defaultControllerMsgHandlerFactory =
+ new DefaultControllerMessageHandlerFactory();
+ _messagingService.getExecutor()
+ .registerMessageHandlerFactory(defaultControllerMsgHandlerFactory.getMessageType(),
+ defaultControllerMsgHandlerFactory);
+ MessageHandlerFactory defaultSchedulerMsgHandlerFactory =
+ new DefaultSchedulerMessageHandlerFactory(this);
+ _messagingService.getExecutor()
+ .registerMessageHandlerFactory(defaultSchedulerMsgHandlerFactory.getMessageType(),
+ defaultSchedulerMsgHandlerFactory);
+ MessageHandlerFactory defaultParticipantErrorMessageHandlerFactory =
+ new DefaultParticipantErrorMessageHandlerFactory(this);
+ _messagingService.getExecutor()
+ .registerMessageHandlerFactory(defaultParticipantErrorMessageHandlerFactory.getMessageType(),
+ defaultParticipantErrorMessageHandlerFactory);
+
+ if (_leaderElectionHandler == null)
+ {
+ final String path =
+ PropertyPathConfig.getPath(PropertyType.CONTROLLER, _clusterName);
+
+ _leaderElectionHandler =
+ createCallBackHandler(path,
+ new DistClusterControllerElection(_zkConnectString),
+ new EventType[] { EventType.NodeChildrenChanged,
+ EventType.NodeDeleted, EventType.NodeCreated },
+ ChangeType.CONTROLLER);
+ }
+ else
+ {
+ _leaderElectionHandler.init();
+ }
+ }
+
+ if (_instanceType == InstanceType.PARTICIPANT
+ || _instanceType == InstanceType.CONTROLLER_PARTICIPANT
+ || (_instanceType == InstanceType.CONTROLLER && isLeader()))
+ {
+ initHandlers();
+ }
+ }
+
+ private void handleNewSessionAsParticipant()
+ {
+ // In case there is a live instance record on zookeeper
+ Builder keyBuilder = _helixAccessor.keyBuilder();
+
+ if (_helixAccessor.getProperty(keyBuilder.liveInstance(_instanceName)) != null)
+ {
+ logger.warn("Found another instance with same instanceName: " + _instanceName
+ + " in cluster " + _clusterName);
+ // Wait for a while, in case previous storage node exits unexpectedly
+ // and its liveinstance
+ // still hangs around until session timeout happens
+ try
+ {
+ Thread.sleep(_sessionTimeout + 5000);
+ }
+ catch (InterruptedException e)
+ {
+ logger.warn("Sleep interrupted while waiting for previous liveinstance to go away.",
+ e);
+ }
+
+ if (_helixAccessor.getProperty(keyBuilder.liveInstance(_instanceName)) != null)
+ {
+ String errorMessage =
+ "instance " + _instanceName + " already has a liveinstance in cluster "
+ + _clusterName;
+ logger.error(errorMessage);
+ throw new HelixException(errorMessage);
+ }
+ }
+ // Invoke the PreConnectCallbacks
+ for (PreConnectCallback callback : _preConnectCallbacks)
+ {
+ callback.onPreConnect();
+ }
+ addLiveInstance();
+ carryOverPreviousCurrentState();
+
+ // In case the cluster manager is running as a participant, setup message
+ // listener
+ _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
+ _stateMachEngine);
+ addMessageListener(_messagingService.getExecutor(), _instanceName);
+ addControllerListener(_helixAccessor);
+
+ if (_participantHealthCheckInfoCollector == null)
+ {
+ _participantHealthCheckInfoCollector =
+ new ParticipantHealthReportCollectorImpl(this, _instanceName);
+ _participantHealthCheckInfoCollector.start();
+ }
+ // start the participant health check timer, also create zk path for health
+ // check info
+ String healthCheckInfoPath =
+ _helixAccessor.keyBuilder().healthReports(_instanceName).getPath();
+ if (!_zkClient.exists(healthCheckInfoPath))
+ {
+ _zkClient.createPersistent(healthCheckInfoPath, true);
+ logger.info("Creating healthcheck info path " + healthCheckInfoPath);
+ }
+ }
+
+ @Override
+ public void addPreConnectCallback(PreConnectCallback callback)
+ {
+ logger.info("Adding preconnect callback");
+ _preConnectCallbacks.add(callback);
+ }
+
+ private void resetHandlers()
+ {
+ synchronized (this)
+ {
+ // get a copy of the list and iterate over the copy list
+ // in case handler.reset() will modify the original handler list
+ List<CallbackHandler> handlers = new ArrayList<CallbackHandler>();
+ handlers.addAll(_handlers);
+
+ for (CallbackHandler handler : handlers)
+ {
+ handler.reset();
+ logger.info("reset handler: " + handler.getPath() + " by "
+ + Thread.currentThread().getName());
+ }
+ }
+ }
+
+ private void initHandlers()
+ {
+ // may add new currentState and message listeners during init()
+ // so make a copy and iterate over the copy
+ synchronized (this)
+ {
+ List<CallbackHandler> handlers = new ArrayList<CallbackHandler>();
+ handlers.addAll(_handlers);
+ for (CallbackHandler handler : handlers)
+ {
+ handler.init();
+ }
+ }
+ }
+
+ private void addListener(CallbackHandler handler)
+ {
+ synchronized (this)
+ {
+ _handlers.add(handler);
+ logger.info("add handler: " + handler.getPath() + " by "
+ + Thread.currentThread().getName());
+ }
+ }
+
+ @Override
+ public boolean isLeader()
+ {
+ if (!isConnected())
+ {
+ return false;
+ }
+
+ if (_instanceType != InstanceType.CONTROLLER)
+ {
+ return false;
+ }
+
+ Builder keyBuilder = _helixAccessor.keyBuilder();
+ LiveInstance leader = _helixAccessor.getProperty(keyBuilder.controllerLeader());
+ if (leader == null)
+ {
+ return false;
+ }
+ else
+ {
+ String leaderName = leader.getInstanceName();
+ // TODO need check sessionId also, but in distributed mode, leader's
+ // sessionId is
+ // not equal to
+ // the leader znode's sessionId field which is the sessionId of the
+ // controller_participant that
+ // successfully creates the leader node
+ if (leaderName == null || !leaderName.equals(_instanceName))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void carryOverPreviousCurrentState()
+ {
+ Builder keyBuilder = _helixAccessor.keyBuilder();
+
+ List<String> subPaths =
+ _helixAccessor.getChildNames(keyBuilder.sessions(_instanceName));
+ for (String previousSessionId : subPaths)
+ {
+ List<CurrentState> previousCurrentStates =
+ _helixAccessor.getChildValues(keyBuilder.currentStates(_instanceName,
+ previousSessionId));
+
+ for (CurrentState previousCurrentState : previousCurrentStates)
+ {
+ if (!previousSessionId.equalsIgnoreCase(_sessionId))
+ {
+ logger.info("Carrying over old session:" + previousSessionId + " resource "
+ + previousCurrentState.getId() + " to new session:" + _sessionId);
+ String stateModelDefRef = previousCurrentState.getStateModelDefRef();
+ if (stateModelDefRef == null)
+ {
+ logger.error("pervious current state doesn't have a state model def. skip it. prevCS: "
+ + previousCurrentState);
+ continue;
+ }
+ StateModelDefinition stateModel =
+ _helixAccessor.getProperty(keyBuilder.stateModelDef(stateModelDefRef));
+ for (String partitionName : previousCurrentState.getPartitionStateMap()
+ .keySet())
+ {
+
+ previousCurrentState.setState(partitionName, stateModel.getInitialState());
+ }
+ previousCurrentState.setSessionId(_sessionId);
+ _helixAccessor.setProperty(keyBuilder.currentState(_instanceName,
+ _sessionId,
+ previousCurrentState.getId()),
+ previousCurrentState);
+ }
+ }
+ }
+ // Deleted old current state
+ for (String previousSessionId : subPaths)
+ {
+ if (!previousSessionId.equalsIgnoreCase(_sessionId))
+ {
+ String path =
+ _helixAccessor.keyBuilder()
+ .currentStates(_instanceName, previousSessionId)
+ .getPath();
+ logger.info("Deleting previous current state. path: " + path + "/"
+ + previousSessionId);
+ _zkClient.deleteRecursive(path);
+
+ }
+ }
+ }
+
+ @Deprecated
+ @Override
+ public synchronized PropertyStore<ZNRecord> getPropertyStore()
+ {
+ checkConnected();
+
+ if (_propertyStore == null)
+ {
+ String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
+
+ // reuse the existing zkClient because its serializer will use raw serialization
+ // for paths of the property store.
+ _propertyStore =
+ new ZKPropertyStore<ZNRecord>(_zkClient, new ZNRecordJsonSerializer(), path);
+ }
+
+ return _propertyStore;
+ }
+
+ @Override
+ public synchronized ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore()
+ {
+ checkConnected();
+
+ if (_helixPropertyStore == null)
+ {
+ String path =
+ PropertyPathConfig.getPath(PropertyType.HELIX_PROPERTYSTORE, _clusterName);
+
+ _helixPropertyStore =
+ new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkClient),
+ path,
+ null);
+ }
+
+ return _helixPropertyStore;
+ }
+
+ @Override
+ public synchronized HelixAdmin getClusterManagmentTool()
+ {
+ checkConnected();
+ if (_zkClient != null)
+ {
+ _managementTool = new ZKHelixAdmin(_zkClient);
+ }
+ else
+ {
+ logger.error("Couldn't get ZKClusterManagementTool because zkClient is null");
+ }
+
+ return _managementTool;
+ }
+
+ @Override
+ public ClusterMessagingService getMessagingService()
+ {
+ // The caller can register message handler factories on messaging service before the
+ // helix manager is connected. Thus we do not do connected check here.
+ return _messagingService;
+ }
+
+ @Override
+ public ParticipantHealthReportCollector getHealthReportCollector()
+ {
+ checkConnected();
+ return _participantHealthCheckInfoCollector;
+ }
+
+ @Override
+ public InstanceType getInstanceType()
+ {
+ return _instanceType;
+ }
+
+ private void checkConnected()
+ {
+ if (!isConnected())
+ {
+ throw new HelixException("ClusterManager not connected. Call clusterManager.connect()");
+ }
+ }
+
+ @Override
+ public String getVersion()
+ {
+ return _version;
+ }
+
+ @Override
+ public StateMachineEngine getStateMachineEngine()
+ {
+ return _stateMachEngine;
+ }
+
+ protected List<CallbackHandler> getHandlers()
+ {
+ return _handlers;
+ }
+
+ // TODO: rename this and not expose this function as part of interface
+ @Override
+ public void startTimerTasks()
+ {
+ for (HelixTimerTask task : _controllerTimerTasks)
+ {
+ task.start();
+ }
+ startStatusUpdatedumpTask();
+ }
+
+ @Override
+ public void stopTimerTasks()
+ {
+ for (HelixTimerTask task : _controllerTimerTasks)
+ {
+ task.stop();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
new file mode 100644
index 0000000..e16ec82
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
@@ -0,0 +1,353 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+
+
+public final class ZKUtil
+{
+ private static Logger logger = Logger.getLogger(ZKUtil.class);
+ private static int RETRYLIMIT = 3;
+
+ private ZKUtil()
+ {
+ }
+
+ public static boolean isClusterSetup(String clusterName, ZkClient zkClient)
+ {
+ if (clusterName == null || zkClient == null)
+ {
+ return false;
+ }
+
+ boolean isValid =
+ zkClient.exists(PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName))
+ && zkClient.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS,
+ clusterName,
+ ConfigScopeProperty.CLUSTER.toString(),
+ clusterName))
+ && zkClient.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS,
+ clusterName,
+ ConfigScopeProperty.PARTICIPANT.toString()))
+ && zkClient.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS,
+ clusterName,
+ ConfigScopeProperty.RESOURCE.toString()))
+ && zkClient.exists(PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE,
+ clusterName))
+ && zkClient.exists(PropertyPathConfig.getPath(PropertyType.LIVEINSTANCES,
+ clusterName))
+ && zkClient.exists(PropertyPathConfig.getPath(PropertyType.INSTANCES,
+ clusterName))
+ && zkClient.exists(PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW,
+ clusterName))
+ && zkClient.exists(PropertyPathConfig.getPath(PropertyType.CONTROLLER,
+ clusterName))
+ && zkClient.exists(PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS,
+ clusterName))
+ && zkClient.exists(PropertyPathConfig.getPath(PropertyType.MESSAGES_CONTROLLER,
+ clusterName))
+ && zkClient.exists(PropertyPathConfig.getPath(PropertyType.ERRORS_CONTROLLER,
+ clusterName))
+ && zkClient.exists(PropertyPathConfig.getPath(PropertyType.STATUSUPDATES_CONTROLLER,
+ clusterName))
+ && zkClient.exists(PropertyPathConfig.getPath(PropertyType.HISTORY,
+ clusterName));
+
+ return isValid;
+ }
+
+ public static void createChildren(ZkClient client,
+ String parentPath,
+ List<ZNRecord> list)
+ {
+ client.createPersistent(parentPath, true);
+ if (list != null)
+ {
+ for (ZNRecord record : list)
+ {
+ createChildren(client, parentPath, record);
+ }
+ }
+ }
+
+ public static void createChildren(ZkClient client,
+ String parentPath,
+ ZNRecord nodeRecord)
+ {
+ client.createPersistent(parentPath, true);
+
+ String id = nodeRecord.getId();
+ String temp = parentPath + "/" + id;
+ client.createPersistent(temp, nodeRecord);
+ }
+
+ public static void dropChildren(ZkClient client, String parentPath, List<ZNRecord> list)
+ {
+ // TODO: check if parentPath exists
+ if (list != null)
+ {
+ for (ZNRecord record : list)
+ {
+ dropChildren(client, parentPath, record);
+ }
+ }
+ }
+
+ public static void dropChildren(ZkClient client, String parentPath, ZNRecord nodeRecord)
+ {
+ // TODO: check if parentPath exists
+ String id = nodeRecord.getId();
+ String temp = parentPath + "/" + id;
+ client.deleteRecursive(temp);
+ }
+
+ public static List<ZNRecord> getChildren(ZkClient client, String path)
+ {
+ // parent watch will be set by zkClient
+ List<String> children = client.getChildren(path);
+ if (children == null || children.size() == 0)
+ {
+ return Collections.emptyList();
+ }
+
+ List<ZNRecord> childRecords = new ArrayList<ZNRecord>();
+ for (String child : children)
+ {
+ String childPath = path + "/" + child;
+ Stat newStat = new Stat();
+ ZNRecord record = client.readDataAndStat(childPath, newStat, true);
+ if (record != null)
+ {
+ record.setVersion(newStat.getVersion());
+ record.setCreationTime(newStat.getCtime());
+ record.setModifiedTime(newStat.getMtime());
+ childRecords.add(record);
+ }
+ }
+ return childRecords;
+ }
+
+ public static void updateIfExists(ZkClient client,
+ String path,
+ final ZNRecord record,
+ boolean mergeOnUpdate)
+ {
+ if (client.exists(path))
+ {
+ DataUpdater<Object> updater = new DataUpdater<Object>()
+ {
+ @Override
+ public Object update(Object currentData)
+ {
+ return record;
+ }
+ };
+ client.updateDataSerialized(path, updater);
+ }
+ }
+
+ public static void createOrUpdate(ZkClient client,
+ String path,
+ final ZNRecord record,
+ final boolean persistent,
+ final boolean mergeOnUpdate)
+ {
+ int retryCount = 0;
+ while (retryCount < RETRYLIMIT)
+ {
+ try
+ {
+ if (client.exists(path))
+ {
+ DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>()
+ {
+ @Override
+ public ZNRecord update(ZNRecord currentData)
+ {
+ if (currentData != null && mergeOnUpdate)
+ {
+ currentData.merge(record);
+ return currentData;
+ }
+ return record;
+ }
+ };
+ client.updateDataSerialized(path, updater);
+ }
+ else
+ {
+ CreateMode mode = (persistent) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
+ if (record.getDeltaList().size() > 0)
+ {
+ ZNRecord value = new ZNRecord(record.getId());
+ value.merge(record);
+ client.create(path, value, mode);
+ }
+ else
+ {
+ client.create(path, record, mode);
+ }
+ }
+ break;
+ }
+ catch (Exception e)
+ {
+ retryCount = retryCount + 1;
+ logger.warn("Exception trying to update " + path + " Exception:" + e.getMessage()
+ + ". Will retry.");
+ }
+ }
+ }
+
+ public static void asyncCreateOrUpdate(ZkClient client,
+ String path,
+ final ZNRecord record,
+ final boolean persistent,
+ final boolean mergeOnUpdate)
+ {
+ try
+ {
+ if (client.exists(path))
+ {
+ if (mergeOnUpdate)
+ {
+ ZNRecord curRecord = client.readData(path);
+ if (curRecord != null)
+ {
+ curRecord.merge(record);
+ client.asyncSetData(path, curRecord, -1, null);
+ }
+ else
+ {
+ client.asyncSetData(path, record, -1, null);
+ }
+ }
+ else
+ {
+ client.asyncSetData(path, record, -1, null);
+ }
+ }
+ else
+ {
+ CreateMode mode = (persistent) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
+ if (record.getDeltaList().size() > 0)
+ {
+ ZNRecord newRecord = new ZNRecord(record.getId());
+ newRecord.merge(record);
+ client.create(path, null, mode);
+
+ client.asyncSetData(path, newRecord, -1, null);
+ }
+ else
+ {
+ client.create(path, null, mode);
+
+ client.asyncSetData(path, record, -1, null);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ logger.error("Exception in async create or update " + path + ". Exception: "
+ + e.getMessage() + ". Give up.");
+ }
+ }
+
+ public static void createOrReplace(ZkClient client,
+ String path,
+ final ZNRecord record,
+ final boolean persistent)
+ {
+ int retryCount = 0;
+ while (retryCount < RETRYLIMIT)
+ {
+ try
+ {
+ if (client.exists(path))
+ {
+ DataUpdater<Object> updater = new DataUpdater<Object>()
+ {
+ @Override
+ public Object update(Object currentData)
+ {
+ return record;
+ }
+ };
+ client.updateDataSerialized(path, updater);
+ }
+ else
+ {
+ CreateMode mode = (persistent) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
+ client.create(path, record, mode);
+ }
+ break;
+ }
+ catch (Exception e)
+ {
+ retryCount = retryCount + 1;
+ logger.warn("Exception trying to createOrReplace " + path + " Exception:"
+ + e.getMessage() + ". Will retry.");
+ }
+ }
+ }
+
+ public static void subtract(ZkClient client,
+ String path,
+ final ZNRecord recordTosubtract)
+ {
+ int retryCount = 0;
+ while (retryCount < RETRYLIMIT)
+ {
+ try
+ {
+ if (client.exists(path))
+ {
+ DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>()
+ {
+ @Override
+ public ZNRecord update(ZNRecord currentData)
+ {
+ currentData.subtract(recordTosubtract);
+ return currentData;
+ }
+ };
+ client.updateDataSerialized(path, updater);
+ break;
+ }
+ }
+ catch (Exception e)
+ {
+ retryCount = retryCount + 1;
+ logger.warn("Exception trying to createOrReplace " + path + " Exception:"
+ + e.getMessage() + ". Will retry.");
+ e.printStackTrace();
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
new file mode 100644
index 0000000..9dd8bde
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
@@ -0,0 +1,133 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.io.ByteArrayInputStream;
+import java.io.StringWriter;
+import java.util.List;
+import java.util.Map;
+
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+
+public class ZNRecordSerializer implements ZkSerializer
+{
+ private static Logger logger = Logger.getLogger(ZNRecordSerializer.class);
+
+ private static int getListFieldBound(ZNRecord record)
+ {
+ int max = Integer.MAX_VALUE;
+ if (record.getSimpleFields().containsKey(ZNRecord.LIST_FIELD_BOUND))
+ {
+ String maxStr = record.getSimpleField(ZNRecord.LIST_FIELD_BOUND);
+ try
+ {
+ max = Integer.parseInt(maxStr);
+ }
+ catch (Exception e)
+ {
+ logger.error("IllegalNumberFormat for list field bound: " + maxStr);
+ }
+ }
+ return max;
+ }
+
+ @Override
+ public byte[] serialize(Object data)
+ {
+ if (!(data instanceof ZNRecord))
+ {
+ // null is NOT an instance of any class
+ logger.error("Input object must be of type ZNRecord but it is " + data + ". Will not write to zk");
+ throw new HelixException("Input object is not of type ZNRecord (was " + data + ")");
+ }
+
+ ZNRecord record = (ZNRecord) data;
+
+ // apply retention policy
+ int max = getListFieldBound(record);
+ if (max < Integer.MAX_VALUE)
+ {
+ Map<String, List<String>> listMap = record.getListFields();
+ for (String key : listMap.keySet())
+ {
+ List<String> list = listMap.get(key);
+ if (list.size() > max)
+ {
+ listMap.put(key, list.subList(0, max));
+ }
+ }
+ }
+
+ // do serialization
+ ObjectMapper mapper = new ObjectMapper();
+ SerializationConfig serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+ serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+ serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+ StringWriter sw = new StringWriter();
+ try
+ {
+ mapper.writeValue(sw, data);
+ } catch (Exception e)
+ {
+ logger.error("Exception during data serialization. Will not write to zk. Data (first 1k): "
+ + sw.toString().substring(0, 1024), e);
+ throw new HelixException(e);
+ }
+
+ if (sw.toString().getBytes().length > ZNRecord.SIZE_LIMIT)
+ {
+ logger.error("Data size larger than 1M, ZNRecord.id: " + record.getId()
+ + ". Will not write to zk. Data (first 1k): " + sw.toString().substring(0, 1024));
+ throw new HelixException("Data size larger than 1M, ZNRecord.id: " + record.getId());
+ }
+ return sw.toString().getBytes();
+ }
+
+ @Override
+ public Object deserialize(byte[] bytes)
+ {
+ if (bytes == null || bytes.length == 0)
+ {
+ // reading a parent/null node
+ return null;
+ }
+
+ ObjectMapper mapper = new ObjectMapper();
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+
+ DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
+ deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+ deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
+ deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
+ try
+ {
+ ZNRecord zn = mapper.readValue(bais, ZNRecord.class);
+ return zn;
+ } catch (Exception e)
+ {
+ logger.error("Exception during deserialization of bytes: " + new String(bytes), e);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
new file mode 100644
index 0000000..9f9680e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
@@ -0,0 +1,300 @@
+package org.apache.helix.manager.zk;
+
+import java.io.ByteArrayInputStream;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonToken;
+
+
+public class ZNRecordStreamingSerializer implements ZkSerializer
+{
+ private static Logger LOG = Logger.getLogger(ZNRecordStreamingSerializer.class);
+
+ private static int getListFieldBound(ZNRecord record)
+ {
+ int max = Integer.MAX_VALUE;
+ if (record.getSimpleFields().containsKey(ZNRecord.LIST_FIELD_BOUND))
+ {
+ String maxStr = record.getSimpleField(ZNRecord.LIST_FIELD_BOUND);
+ try
+ {
+ max = Integer.parseInt(maxStr);
+ }
+ catch (Exception e)
+ {
+ LOG.error("IllegalNumberFormat for list field bound: " + maxStr);
+ }
+ }
+ return max;
+ }
+
+ @Override
+ public byte[] serialize(Object data) throws ZkMarshallingError
+ {
+ if (!(data instanceof ZNRecord))
+ {
+ // null is NOT an instance of any class
+ LOG.error("Input object must be of type ZNRecord but it is " + data + ". Will not write to zk");
+ throw new HelixException("Input object is not of type ZNRecord (was " + data + ")");
+ }
+
+ // apply retention policy on list field
+ ZNRecord record = (ZNRecord) data;
+ int max = getListFieldBound(record);
+ if (max < Integer.MAX_VALUE)
+ {
+ Map<String, List<String>> listMap = record.getListFields();
+ for (String key : listMap.keySet())
+ {
+ List<String> list = listMap.get(key);
+ if (list.size() > max)
+ {
+ listMap.put(key, list.subList(0, max));
+ }
+ }
+ }
+
+ StringWriter sw = new StringWriter();
+ try
+ {
+ JsonFactory f = new JsonFactory();
+ JsonGenerator g = f.createJsonGenerator(sw);
+
+ g.writeStartObject();
+
+ // write id field
+ g.writeRaw("\n ");
+ g.writeStringField("id", record.getId());
+
+ // write simepleFields
+ g.writeRaw("\n ");
+ g.writeObjectFieldStart("simpleFields");
+ for (String key : record.getSimpleFields().keySet())
+ {
+ g.writeRaw("\n ");
+ g.writeStringField(key, record.getSimpleField(key));
+ }
+ g.writeRaw("\n ");
+ g.writeEndObject(); // for simpleFields
+
+ // write listFields
+ g.writeRaw("\n ");
+ g.writeObjectFieldStart("listFields");
+ for (String key : record.getListFields().keySet())
+ {
+ // g.writeStringField(key, record.getListField(key).toString());
+
+ // g.writeObjectFieldStart(key);
+ g.writeRaw("\n ");
+ g.writeArrayFieldStart(key);
+ List<String> list = record.getListField(key);
+ for (String listValue : list)
+ {
+ g.writeString(listValue);
+ }
+ // g.writeEndObject();
+ g.writeEndArray();
+
+ }
+ g.writeRaw("\n ");
+ g.writeEndObject(); // for listFields
+
+ // write mapFields
+ g.writeRaw("\n ");
+ g.writeObjectFieldStart("mapFields");
+ for (String key : record.getMapFields().keySet())
+ {
+ // g.writeStringField(key, record.getMapField(key).toString());
+ g.writeRaw("\n ");
+ g.writeObjectFieldStart(key);
+ Map<String, String> map = record.getMapField(key);
+ for (String mapKey : map.keySet())
+ {
+ g.writeRaw("\n ");
+ g.writeStringField(mapKey, map.get(mapKey));
+ }
+ g.writeRaw("\n ");
+ g.writeEndObject();
+
+ }
+ g.writeRaw("\n ");
+ g.writeEndObject(); // for mapFields
+
+ g.writeRaw("\n");
+ g.writeEndObject(); // for whole znrecord
+
+ // important: will force flushing of output, close underlying output
+ // stream
+ g.close();
+ }
+ catch (Exception e)
+ {
+ LOG.error("Exception during data serialization. Will not write to zk. Data (first 1k): "
+ + sw.toString().substring(0, 1024), e);
+ throw new HelixException(e);
+ }
+
+ // check size
+ if (sw.toString().getBytes().length > ZNRecord.SIZE_LIMIT)
+ {
+ LOG.error("Data size larger than 1M, ZNRecord.id: " + record.getId()
+ + ". Will not write to zk. Data (first 1k): " + sw.toString().substring(0, 1024));
+ throw new HelixException("Data size larger than 1M, ZNRecord.id: " + record.getId());
+ }
+
+ return sw.toString().getBytes();
+ }
+
+ @Override
+ public Object deserialize(byte[] bytes) throws ZkMarshallingError
+ {
+ if (bytes == null || bytes.length == 0)
+ {
+ LOG.error("ZNode is empty.");
+ return null;
+ }
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ ZNRecord record = null;
+
+ try
+ {
+ JsonFactory f = new JsonFactory();
+ JsonParser jp = f.createJsonParser(bais);
+
+ jp.nextToken(); // will return JsonToken.START_OBJECT (verify?)
+ while (jp.nextToken() != JsonToken.END_OBJECT)
+ {
+ String fieldname = jp.getCurrentName();
+ jp.nextToken(); // move to value, or START_OBJECT/START_ARRAY
+ if ("id".equals(fieldname))
+ {
+ // contains an object
+ record = new ZNRecord(jp.getText());
+ }
+ else if ("simpleFields".equals(fieldname))
+ {
+ while (jp.nextToken() != JsonToken.END_OBJECT)
+ {
+ String key = jp.getCurrentName();
+ jp.nextToken(); // move to value
+ record.setSimpleField(key, jp.getText());
+ }
+ }
+ else if ("mapFields".equals(fieldname))
+ {
+ // user.setVerified(jp.getCurrentToken() == JsonToken.VALUE_TRUE);
+ while (jp.nextToken() != JsonToken.END_OBJECT)
+ {
+ String key = jp.getCurrentName();
+ record.setMapField(key, new TreeMap<String, String>());
+ jp.nextToken(); // move to value
+
+ while (jp.nextToken() != JsonToken.END_OBJECT)
+ {
+ String mapKey = jp.getCurrentName();
+ jp.nextToken(); // move to value
+ record.getMapField(key).put(mapKey, jp.getText());
+ }
+ }
+
+ }
+ else if ("listFields".equals(fieldname))
+ {
+ // user.setUserImage(jp.getBinaryValue());
+ while (jp.nextToken() != JsonToken.END_OBJECT)
+ {
+ String key = jp.getCurrentName();
+ record.setListField(key, new ArrayList<String>());
+ jp.nextToken(); // move to value
+ while (jp.nextToken() != JsonToken.END_ARRAY)
+ {
+ record.getListField(key).add(jp.getText());
+ }
+
+ }
+
+ }
+ else
+ {
+ throw new IllegalStateException("Unrecognized field '" + fieldname + "'!");
+ }
+ }
+ jp.close(); // ensure resources get cleaned up timely and properly
+ }
+ catch (Exception e)
+ {
+ LOG.error("Exception during deserialization of bytes: " + new String(bytes), e);
+ }
+
+ return record;
+ }
+
+ public static void main(String[] args)
+ {
+ ZNRecord record = new ZNRecord("record");
+ final int recordSize = 10;
+ for (int i = 0; i < recordSize; i++)
+ {
+ record.setSimpleField("" + i, "" + i);
+ record.setListField("" + i, new ArrayList<String>());
+ for (int j = 0; j < recordSize; j++)
+ {
+ record.getListField("" + i).add("" + j);
+ }
+
+ record.setMapField("" + i, new TreeMap<String, String>());
+ for (int j = 0; j < recordSize; j++)
+ {
+ record.getMapField("" + i).put("" + j, "" + j);
+ }
+ }
+
+ ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
+ byte[] bytes = serializer.serialize(record);
+ System.out.println(new String(bytes));
+ ZNRecord record2 = (ZNRecord) serializer.deserialize(bytes);
+ System.out.println(record2);
+
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < 100; i++)
+ {
+ bytes = serializer.serialize(record);
+ // System.out.println(new String(bytes));
+ record2 = (ZNRecord) serializer.deserialize(bytes);
+ // System.out.println(record2);
+ }
+ long end = System.currentTimeMillis();
+ System.out.println("ZNRecordStreamingSerializer time used: " + (end - start));
+
+ ZNRecordSerializer serializer2 = new ZNRecordSerializer();
+ bytes = serializer2.serialize(record);
+ // System.out.println(new String(bytes));
+ record2 = (ZNRecord) serializer2.deserialize(bytes);
+ // System.out.println(record2);
+
+ start = System.currentTimeMillis();
+ for (int i = 0; i < 100; i++)
+ {
+ bytes = serializer2.serialize(record);
+ // System.out.println(new String(bytes));
+ record2 = (ZNRecord) serializer2.deserialize(bytes);
+ // System.out.println(record2);
+ }
+ end = System.currentTimeMillis();
+ System.out.println("ZNRecordSerializer time used: " + (end - start));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java
new file mode 100644
index 0000000..f412ce9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java
@@ -0,0 +1,171 @@
+package org.apache.helix.manager.zk;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
+
+public class ZkAsyncCallbacks
+{
+ private static Logger LOG = Logger.getLogger(ZkAsyncCallbacks.class);
+
+ static class GetDataCallbackHandler extends DefaultCallback implements DataCallback
+ {
+ byte[] _data;
+ Stat _stat;
+
+ @Override
+ public void handle()
+ {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat)
+ {
+ if (rc == 0)
+ {
+ _data = data;
+ _stat = stat;
+ }
+ callback(rc, path, ctx);
+ }
+ }
+
+ static class SetDataCallbackHandler extends DefaultCallback implements StatCallback
+ {
+ Stat _stat;
+
+ @Override
+ public void handle()
+ {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, Stat stat)
+ {
+ if (rc == 0)
+ {
+ _stat = stat;
+ }
+ callback(rc, path, ctx);
+ }
+
+ public Stat getStat()
+ {
+ return _stat;
+ }
+ }
+
+ static class ExistsCallbackHandler extends DefaultCallback implements StatCallback
+ {
+ Stat _stat;
+
+ @Override
+ public void handle()
+ {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, Stat stat)
+ {
+ if (rc == 0)
+ {
+ _stat = stat;
+ }
+ callback(rc, path, ctx);
+ }
+
+ }
+
+ static class CreateCallbackHandler extends DefaultCallback implements StringCallback
+ {
+ @Override
+ public void processResult(int rc, String path, Object ctx, String name)
+ {
+ callback(rc, path, ctx);
+ }
+
+ @Override
+ public void handle()
+ {
+ // TODO Auto-generated method stub
+ }
+ }
+
+ static class DeleteCallbackHandler extends DefaultCallback implements VoidCallback
+ {
+ @Override
+ public void processResult(int rc, String path, Object ctx)
+ {
+ callback(rc, path, ctx);
+ }
+
+ @Override
+ public void handle()
+ {
+ // TODO Auto-generated method stub
+ }
+
+ }
+
+ /**
+ * Default callback for zookeeper async api
+ */
+ static abstract class DefaultCallback
+ {
+ AtomicBoolean _lock = new AtomicBoolean(false);
+ int _rc = -1;
+
+ public void callback(int rc, String path, Object ctx)
+ {
+ if (rc != 0)
+ {
+ LOG.warn(this + ", rc:" + Code.get(rc) + ", path: " + path);
+ }
+ _rc = rc;
+ handle();
+
+ synchronized (_lock)
+ {
+ _lock.set(true);
+ _lock.notify();
+ }
+ }
+
+ public boolean waitForSuccess()
+ {
+ try
+ {
+ synchronized (_lock)
+ {
+ while (!_lock.get())
+ {
+ _lock.wait();
+ }
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return true;
+ }
+
+ public int getRc()
+ {
+ return _rc;
+ }
+
+ abstract public void handle();
+ }
+
+}