You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC
[17/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixManager.java
deleted file mode 100644
index a8891d6..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixManager.java
+++ /dev/null
@@ -1,1098 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import static com.linkedin.helix.HelixConstants.ChangeType.CONFIG;
-import static com.linkedin.helix.HelixConstants.ChangeType.CURRENT_STATE;
-import static com.linkedin.helix.HelixConstants.ChangeType.EXTERNAL_VIEW;
-import static com.linkedin.helix.HelixConstants.ChangeType.HEALTH;
-import static com.linkedin.helix.HelixConstants.ChangeType.IDEAL_STATE;
-import static com.linkedin.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
-import static com.linkedin.helix.HelixConstants.ChangeType.MESSAGE;
-import static com.linkedin.helix.HelixConstants.ChangeType.MESSAGES_CONTROLLER;
-
-import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.Timer;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.TimeUnit;
-
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
-import com.linkedin.helix.BaseDataAccessor;
-import com.linkedin.helix.ClusterMessagingService;
-import com.linkedin.helix.ConfigAccessor;
-import com.linkedin.helix.ConfigChangeListener;
-import com.linkedin.helix.ConfigScope.ConfigScopeProperty;
-import com.linkedin.helix.ControllerChangeListener;
-import com.linkedin.helix.CurrentStateChangeListener;
-import com.linkedin.helix.DataAccessor;
-import com.linkedin.helix.ExternalViewChangeListener;
-import com.linkedin.helix.HealthStateChangeListener;
-import com.linkedin.helix.HelixAdmin;
-import com.linkedin.helix.HelixConstants.ChangeType;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixTimerTask;
-import com.linkedin.helix.IdealStateChangeListener;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.LiveInstanceChangeListener;
-import com.linkedin.helix.MessageListener;
-import com.linkedin.helix.PreConnectCallback;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.controller.restlet.ZKPropertyTransferServer;
-import com.linkedin.helix.healthcheck.HealthStatsAggregationTask;
-import com.linkedin.helix.healthcheck.ParticipantHealthReportCollector;
-import com.linkedin.helix.healthcheck.ParticipantHealthReportCollectorImpl;
-import com.linkedin.helix.messaging.DefaultMessagingService;
-import com.linkedin.helix.messaging.handling.MessageHandlerFactory;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.model.StateModelDefinition;
-import com.linkedin.helix.monitoring.ZKPathDataDumpTask;
-import com.linkedin.helix.participant.DistClusterControllerElection;
-import com.linkedin.helix.participant.HelixStateMachineEngine;
-import com.linkedin.helix.participant.StateMachineEngine;
-import com.linkedin.helix.store.PropertyStore;
-import com.linkedin.helix.store.ZNRecordJsonSerializer;
-import com.linkedin.helix.store.zk.ZKPropertyStore;
-import com.linkedin.helix.store.zk.ZkHelixPropertyStore;
-import com.linkedin.helix.tools.PropertiesReader;
-
-public class ZKHelixManager implements HelixManager
-{
- private static Logger logger =
- Logger.getLogger(ZKHelixManager.class);
- private static final int RETRY_LIMIT = 3;
- private static final int CONNECTIONTIMEOUT = 60 * 1000;
- private final String _clusterName;
- private final String _instanceName;
- private final String _zkConnectString;
- private static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
- private ZKDataAccessor _accessor;
- private ZKHelixDataAccessor _helixAccessor;
- private ConfigAccessor _configAccessor;
- protected ZkClient _zkClient;
- private final List<CallbackHandler> _handlers;
- private final ZkStateChangeListener _zkStateChangeListener;
- private final InstanceType _instanceType;
- volatile String _sessionId;
- private Timer _timer;
- private CallbackHandler _leaderElectionHandler;
- private ParticipantHealthReportCollectorImpl _participantHealthCheckInfoCollector;
- private final DefaultMessagingService _messagingService;
- private ZKHelixAdmin _managementTool;
- private final String _version;
- private final StateMachineEngine _stateMachEngine;
- private int _sessionTimeout;
- private PropertyStore<ZNRecord> _propertyStore;
- private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
- private final List<HelixTimerTask> _controllerTimerTasks;
- private BaseDataAccessor<ZNRecord> _baseDataAccessor;
- List<PreConnectCallback> _preConnectCallbacks =
- new LinkedList<PreConnectCallback>();
- ZKPropertyTransferServer _transferServer = null;
-
- public ZKHelixManager(String clusterName,
- String instanceName,
- InstanceType instanceType,
- String zkConnectString) throws Exception
- {
- logger.info("Create a zk-based cluster manager. clusterName:" + clusterName
- + ", instanceName:" + instanceName + ", type:" + instanceType + ", zkSvr:"
- + zkConnectString);
- int sessionTimeoutInt = -1;
- try
- {
- sessionTimeoutInt =
- Integer.parseInt(System.getProperty("zk.session.timeout", ""
- + DEFAULT_SESSION_TIMEOUT));
- }
- catch (NumberFormatException e)
- {
- logger.warn("Exception while parsing session timeout: "
- + System.getProperty("zk.session.timeout", "" + DEFAULT_SESSION_TIMEOUT));
- }
- if (sessionTimeoutInt > 0)
- {
- _sessionTimeout = sessionTimeoutInt;
- }
- else
- {
- _sessionTimeout = DEFAULT_SESSION_TIMEOUT;
- }
- if (instanceName == null)
- {
- try
- {
- instanceName =
- InetAddress.getLocalHost().getCanonicalHostName() + "-"
- + instanceType.toString();
- }
- catch (UnknownHostException e)
- {
- // can ignore it
- logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable",
- e);
- instanceName = "UNKNOWN";
- }
- }
-
- _clusterName = clusterName;
- _instanceName = instanceName;
- _instanceType = instanceType;
- _zkConnectString = zkConnectString;
- _zkStateChangeListener = new ZkStateChangeListener(this);
- _timer = null;
-
- _handlers = new ArrayList<CallbackHandler>();
-
- _messagingService = new DefaultMessagingService(this);
-
- _version =
- new PropertiesReader("cluster-manager-version.properties").getProperty("clustermanager.version");
-
- _stateMachEngine = new HelixStateMachineEngine(this);
-
- // add all timer tasks
- _controllerTimerTasks = new ArrayList<HelixTimerTask>();
- if (_instanceType == InstanceType.CONTROLLER)
- {
- _controllerTimerTasks.add(new HealthStatsAggregationTask(this));
- }
- }
-
- private boolean isInstanceSetup()
- {
- if (_instanceType == InstanceType.PARTICIPANT
- || _instanceType == InstanceType.CONTROLLER_PARTICIPANT)
- {
- boolean isValid =
- _zkClient.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS,
- _clusterName,
- ConfigScopeProperty.PARTICIPANT.toString(),
- _instanceName))
- && _zkClient.exists(PropertyPathConfig.getPath(PropertyType.MESSAGES,
- _clusterName,
- _instanceName))
- && _zkClient.exists(PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
- _clusterName,
- _instanceName))
- && _zkClient.exists(PropertyPathConfig.getPath(PropertyType.STATUSUPDATES,
- _clusterName,
- _instanceName))
- && _zkClient.exists(PropertyPathConfig.getPath(PropertyType.ERRORS,
- _clusterName,
- _instanceName));
-
- return isValid;
- }
- return true;
- }
-
- @Override
- public void addIdealStateChangeListener(final IdealStateChangeListener listener) throws Exception
- {
- logger.info("ClusterManager.addIdealStateChangeListener()");
- checkConnected();
- final String path =
- PropertyPathConfig.getPath(PropertyType.IDEALSTATES, _clusterName);
- CallbackHandler callbackHandler =
- createCallBackHandler(path,
- listener,
- new EventType[] { EventType.NodeDataChanged,
- EventType.NodeDeleted, EventType.NodeCreated },
- IDEAL_STATE);
- addListener(callbackHandler);
- }
-
- @Override
- public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener) throws Exception
- {
- logger.info("ClusterManager.addLiveInstanceChangeListener()");
- checkConnected();
- final String path = _helixAccessor.keyBuilder().liveInstances().getPath();
- CallbackHandler callbackHandler =
- createCallBackHandler(path,
- listener,
- new EventType[] { EventType.NodeDataChanged, EventType.NodeChildrenChanged,
- EventType.NodeDeleted, EventType.NodeCreated },
- LIVE_INSTANCE);
- addListener(callbackHandler);
- }
-
- @Override
- public void addConfigChangeListener(ConfigChangeListener listener)
- {
- logger.info("ClusterManager.addConfigChangeListener()");
- checkConnected();
- final String path =
- PropertyPathConfig.getPath(PropertyType.CONFIGS,
- _clusterName,
- ConfigScopeProperty.PARTICIPANT.toString());
-
- CallbackHandler callbackHandler =
- createCallBackHandler(path,
- listener,
- new EventType[] { EventType.NodeChildrenChanged },
- CONFIG);
- addListener(callbackHandler);
-
- }
-
- // TODO: Decide if do we still need this since we are exposing
- // ClusterMessagingService
- @Override
- public void addMessageListener(MessageListener listener, String instanceName)
- {
- logger.info("ClusterManager.addMessageListener() " + instanceName);
- checkConnected();
- final String path = _helixAccessor.keyBuilder().messages(instanceName).getPath();
- CallbackHandler callbackHandler =
- createCallBackHandler(path,
- listener,
- new EventType[] { EventType.NodeChildrenChanged,
- EventType.NodeDeleted, EventType.NodeCreated },
- MESSAGE);
- addListener(callbackHandler);
- }
-
- void addControllerMessageListener(MessageListener listener)
- {
- logger.info("ClusterManager.addControllerMessageListener()");
- checkConnected();
- final String path = _helixAccessor.keyBuilder().controllerMessages().getPath();
-
- CallbackHandler callbackHandler =
- createCallBackHandler(path,
- listener,
- new EventType[] { EventType.NodeChildrenChanged,
- EventType.NodeDeleted, EventType.NodeCreated },
- MESSAGES_CONTROLLER);
- addListener(callbackHandler);
- }
-
- @Override
- public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
- String instanceName,
- String sessionId)
- {
- logger.info("ClusterManager.addCurrentStateChangeListener() " + instanceName + " "
- + sessionId);
- checkConnected();
- final String path =
- _helixAccessor.keyBuilder().currentStates(instanceName, sessionId).getPath();
-
- CallbackHandler callbackHandler =
- createCallBackHandler(path,
- listener,
- new EventType[] { EventType.NodeChildrenChanged,
- EventType.NodeDeleted, EventType.NodeCreated },
- CURRENT_STATE);
- addListener(callbackHandler);
- }
-
- @Override
- public void addHealthStateChangeListener(HealthStateChangeListener listener,
- String instanceName)
- {
- // System.out.println("ZKClusterManager.addHealthStateChangeListener()");
- // TODO: re-form this for stats checking
- logger.info("ClusterManager.addHealthStateChangeListener()" + instanceName);
- checkConnected();
- final String path = _helixAccessor.keyBuilder().healthReports(instanceName).getPath();
-
- CallbackHandler callbackHandler =
- createCallBackHandler(path, listener, new EventType[] {
- EventType.NodeChildrenChanged, EventType.NodeDataChanged,
- EventType.NodeDeleted, EventType.NodeCreated }, HEALTH);
- addListener(callbackHandler);
- }
-
- @Override
- public void addExternalViewChangeListener(ExternalViewChangeListener listener)
- {
- logger.info("ClusterManager.addExternalViewChangeListener()");
- checkConnected();
- final String path = _helixAccessor.keyBuilder().externalViews().getPath();
-
- CallbackHandler callbackHandler =
- createCallBackHandler(path,
- listener,
- new EventType[] { EventType.NodeDataChanged,
- EventType.NodeDeleted, EventType.NodeCreated },
- EXTERNAL_VIEW);
- addListener(callbackHandler);
- }
-
- @Override
- public DataAccessor getDataAccessor()
- {
- checkConnected();
- return _accessor;
- }
-
- @Override
- public HelixDataAccessor getHelixDataAccessor()
- {
- checkConnected();
- return _helixAccessor;
- }
-
- @Override
- public ConfigAccessor getConfigAccessor()
- {
- checkConnected();
- return _configAccessor;
- }
-
- @Override
- public String getClusterName()
- {
- return _clusterName;
- }
-
- @Override
- public String getInstanceName()
- {
- return _instanceName;
- }
-
- @Override
- public void connect() throws Exception
- {
- logger.info("ClusterManager.connect()");
- if (_zkStateChangeListener.isConnected())
- {
- logger.warn("Cluster manager " + _clusterName + " " + _instanceName
- + " already connected");
- return;
- }
-
- try
- {
- createClient(_zkConnectString);
- _messagingService.onConnected();
- }
- catch (Exception e)
- {
- logger.error(e);
- disconnect();
- throw e;
- }
- }
-
- @Override
- public void disconnect()
- {
-
- if (!isConnected())
- {
- logger.warn("ClusterManager " + _instanceName + " already disconnected");
- return;
- }
-
- logger.info("disconnect " + _instanceName + "(" + _instanceType + ") from "
- + _clusterName);
-
- /**
- * shutdown thread pool first to avoid reset() being invoked in the middle of state
- * transition
- */
- _messagingService.getExecutor().shutDown();
- resetHandlers();
-
- _helixAccessor.shutdown();
-
- if (_leaderElectionHandler != null)
- {
- _leaderElectionHandler.reset();
- }
-
- if (_participantHealthCheckInfoCollector != null)
- {
- _participantHealthCheckInfoCollector.stop();
- }
-
- if (_timer != null)
- {
- _timer.cancel();
- _timer = null;
- }
-
- if (_instanceType == InstanceType.CONTROLLER)
- {
- stopTimerTasks();
- }
-
- if (_propertyStore != null)
- {
- _propertyStore.stop();
- }
-
- // unsubscribe accessor from controllerChange
- _zkClient.unsubscribeAll();
-
- _zkClient.close();
-
- // HACK seems that zkClient is not sending DISCONNECT event
- _zkStateChangeListener.disconnect();
- logger.info("Cluster manager: " + _instanceName + " disconnected");
- }
-
- @Override
- public String getSessionId()
- {
- checkConnected();
- return _sessionId;
- }
-
- @Override
- public boolean isConnected()
- {
- return _zkStateChangeListener.isConnected();
- }
-
- @Override
- public long getLastNotificationTime()
- {
- return -1;
- }
-
- @Override
- public void addControllerListener(ControllerChangeListener listener)
- {
- checkConnected();
- final String path = _helixAccessor.keyBuilder().controller().getPath();
- logger.info("Add controller listener at: " + path);
- CallbackHandler callbackHandler =
- createCallBackHandler(path,
- listener,
- new EventType[] { EventType.NodeChildrenChanged,
- EventType.NodeDeleted, EventType.NodeCreated },
- ChangeType.CONTROLLER);
-
- // System.out.println("add controller listeners to " + _instanceName +
- // " for " + _clusterName);
- // _handlers.add(callbackHandler);
- addListener(callbackHandler);
- }
-
- @Override
- public boolean removeListener(Object listener)
- {
- logger.info("remove listener: " + listener + " from " + _instanceName);
-
- synchronized (this)
- {
- Iterator<CallbackHandler> iterator = _handlers.iterator();
- while (iterator.hasNext())
- {
- CallbackHandler handler = iterator.next();
- // simply compare reference
- if (handler.getListener().equals(listener))
- {
- handler.reset();
- iterator.remove();
- }
- }
- }
-
- return true;
- }
-
- private void addLiveInstance()
- {
- LiveInstance liveInstance = new LiveInstance(_instanceName);
- liveInstance.setSessionId(_sessionId);
- liveInstance.setHelixVersion(_version);
- liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
-
- logger.info("Add live instance: InstanceName: " + _instanceName + " Session id:"
- + _sessionId);
- Builder keyBuilder = _helixAccessor.keyBuilder();
- if (!_helixAccessor.createProperty(keyBuilder.liveInstance(_instanceName),
- liveInstance))
- {
- String errorMsg =
- "Fail to create live instance node after waiting, so quit. instance:"
- + _instanceName;
- logger.warn(errorMsg);
- throw new HelixException(errorMsg);
-
- }
- String currentStatePathParent =
- PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
- _clusterName,
- _instanceName,
- getSessionId());
-
- if (!_zkClient.exists(currentStatePathParent))
- {
- _zkClient.createPersistent(currentStatePathParent);
- logger.info("Creating current state path " + currentStatePathParent);
- }
- }
-
- private void startStatusUpdatedumpTask()
- {
- long initialDelay = 30 * 60 * 1000;
- long period = 120 * 60 * 1000;
- int timeThresholdNoChange = 180 * 60 * 1000;
-
- if (_timer == null)
- {
- _timer = new Timer(true);
- _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(this,
- _zkClient,
- timeThresholdNoChange),
- initialDelay,
- period);
- }
- }
-
- private void createClient(String zkServers) throws Exception
- {
- String propertyStorePath =
- PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
-
- // by default use ZNRecordStreamingSerializer except for paths within the property
- // store which expects raw byte[] serialization/deserialization
- PathBasedZkSerializer zkSerializer =
- ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer())
- .serialize(propertyStorePath, new ByteArraySerializer())
- .build();
-
- _zkClient = new ZkClient(zkServers, _sessionTimeout, CONNECTIONTIMEOUT, zkSerializer);
- _accessor = new ZKDataAccessor(_clusterName, _zkClient);
-
- ZkBaseDataAccessor<ZNRecord> baseDataAccessor =
- new ZkBaseDataAccessor<ZNRecord>(_zkClient);
- if (_instanceType == InstanceType.PARTICIPANT)
- {
- String curStatePath =
- PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
- _clusterName,
- _instanceName);
- _baseDataAccessor =
- new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor,
- Arrays.asList(curStatePath));
- }
- else if (_instanceType == InstanceType.CONTROLLER)
- {
- String extViewPath = PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW,
-
- _clusterName);
- _baseDataAccessor =
- new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor,
- Arrays.asList(extViewPath));
-
- }
- else
- {
- _baseDataAccessor = baseDataAccessor;
- }
-
- _helixAccessor =
- new ZKHelixDataAccessor(_clusterName, _instanceType, _baseDataAccessor);
- _configAccessor = new ConfigAccessor(_zkClient);
- int retryCount = 0;
-
- _zkClient.subscribeStateChanges(_zkStateChangeListener);
- while (retryCount < RETRY_LIMIT)
- {
- try
- {
- _zkClient.waitUntilConnected(_sessionTimeout, TimeUnit.MILLISECONDS);
- _zkStateChangeListener.handleStateChanged(KeeperState.SyncConnected);
- _zkStateChangeListener.handleNewSession();
- break;
- }
- catch (HelixException e)
- {
- logger.error("fail to createClient.", e);
- throw e;
- }
- catch (Exception e)
- {
- retryCount++;
-
- logger.error("fail to createClient. retry " + retryCount, e);
- if (retryCount == RETRY_LIMIT)
- {
- throw e;
- }
- }
- }
- }
-
- private CallbackHandler createCallBackHandler(String path,
- Object listener,
- EventType[] eventTypes,
- ChangeType changeType)
- {
- if (listener == null)
- {
- throw new HelixException("Listener cannot be null");
- }
- return new CallbackHandler(this, _zkClient, path, listener, eventTypes, changeType);
- }
-
- /**
- * This will be invoked when ever a new session is created<br/>
- *
- * case 1: the cluster manager was a participant carry over current state, add live
- * instance, and invoke message listener; case 2: the cluster manager was controller and
- * was a leader before do leader election, and if it becomes leader again, invoke ideal
- * state listener, current state listener, etc. if it fails to become leader in the new
- * session, then becomes standby; case 3: the cluster manager was controller and was NOT
- * a leader before do leader election, and if it becomes leader, instantiate and invoke
- * ideal state listener, current state listener, etc. if if fails to become leader in
- * the new session, stay as standby
- */
-
- protected void handleNewSession()
- {
- boolean isConnected = _zkClient.waitUntilConnected(CONNECTIONTIMEOUT, TimeUnit.MILLISECONDS);
- while (!isConnected)
- {
- logger.error("Could NOT connect to zk server in " + CONNECTIONTIMEOUT + "ms. zkServer: "
- + _zkConnectString + ", expiredSessionId: " + _sessionId + ", clusterName: "
- + _clusterName);
- isConnected = _zkClient.waitUntilConnected(CONNECTIONTIMEOUT, TimeUnit.MILLISECONDS);
- }
-
- ZkConnection zkConnection = ((ZkConnection) _zkClient.getConnection());
-
- synchronized (this)
- {
- _sessionId = Long.toHexString(zkConnection.getZookeeper().getSessionId());
- }
- _accessor.reset();
- _baseDataAccessor.reset();
-
- resetHandlers();
-
- logger.info("Handling new session, session id:" + _sessionId + ", instance:"
- + _instanceName + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName);
-
- logger.info(zkConnection.getZookeeper());
-
- if (!ZKUtil.isClusterSetup(_clusterName, _zkClient))
- {
- throw new HelixException("Initial cluster structure is not set up for cluster:"
- + _clusterName);
- }
- if (!isInstanceSetup())
- {
- throw new HelixException("Initial cluster structure is not set up for instance:"
- + _instanceName + " instanceType:" + _instanceType);
- }
-
- if (_instanceType == InstanceType.PARTICIPANT
- || _instanceType == InstanceType.CONTROLLER_PARTICIPANT)
- {
- handleNewSessionAsParticipant();
- }
-
- if (_instanceType == InstanceType.CONTROLLER
- || _instanceType == InstanceType.CONTROLLER_PARTICIPANT)
- {
- addControllerMessageListener(_messagingService.getExecutor());
- MessageHandlerFactory defaultControllerMsgHandlerFactory =
- new DefaultControllerMessageHandlerFactory();
- _messagingService.getExecutor()
- .registerMessageHandlerFactory(defaultControllerMsgHandlerFactory.getMessageType(),
- defaultControllerMsgHandlerFactory);
- MessageHandlerFactory defaultSchedulerMsgHandlerFactory =
- new DefaultSchedulerMessageHandlerFactory(this);
- _messagingService.getExecutor()
- .registerMessageHandlerFactory(defaultSchedulerMsgHandlerFactory.getMessageType(),
- defaultSchedulerMsgHandlerFactory);
- MessageHandlerFactory defaultParticipantErrorMessageHandlerFactory =
- new DefaultParticipantErrorMessageHandlerFactory(this);
- _messagingService.getExecutor()
- .registerMessageHandlerFactory(defaultParticipantErrorMessageHandlerFactory.getMessageType(),
- defaultParticipantErrorMessageHandlerFactory);
-
- if (_leaderElectionHandler == null)
- {
- final String path =
- PropertyPathConfig.getPath(PropertyType.CONTROLLER, _clusterName);
-
- _leaderElectionHandler =
- createCallBackHandler(path,
- new DistClusterControllerElection(_zkConnectString),
- new EventType[] { EventType.NodeChildrenChanged,
- EventType.NodeDeleted, EventType.NodeCreated },
- ChangeType.CONTROLLER);
- }
- else
- {
- _leaderElectionHandler.init();
- }
- }
-
- if (_instanceType == InstanceType.PARTICIPANT
- || _instanceType == InstanceType.CONTROLLER_PARTICIPANT
- || (_instanceType == InstanceType.CONTROLLER && isLeader()))
- {
- initHandlers();
- }
- }
-
- private void handleNewSessionAsParticipant()
- {
- // In case there is a live instance record on zookeeper
- Builder keyBuilder = _helixAccessor.keyBuilder();
-
- if (_helixAccessor.getProperty(keyBuilder.liveInstance(_instanceName)) != null)
- {
- logger.warn("Found another instance with same instanceName: " + _instanceName
- + " in cluster " + _clusterName);
- // Wait for a while, in case previous storage node exits unexpectedly
- // and its liveinstance
- // still hangs around until session timeout happens
- try
- {
- Thread.sleep(_sessionTimeout + 5000);
- }
- catch (InterruptedException e)
- {
- logger.warn("Sleep interrupted while waiting for previous liveinstance to go away.",
- e);
- }
-
- if (_helixAccessor.getProperty(keyBuilder.liveInstance(_instanceName)) != null)
- {
- String errorMessage =
- "instance " + _instanceName + " already has a liveinstance in cluster "
- + _clusterName;
- logger.error(errorMessage);
- throw new HelixException(errorMessage);
- }
- }
- // Invoke the PreConnectCallbacks
- for (PreConnectCallback callback : _preConnectCallbacks)
- {
- callback.onPreConnect();
- }
- addLiveInstance();
- carryOverPreviousCurrentState();
-
- // In case the cluster manager is running as a participant, setup message
- // listener
- _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
- _stateMachEngine);
- addMessageListener(_messagingService.getExecutor(), _instanceName);
- addControllerListener(_helixAccessor);
-
- if (_participantHealthCheckInfoCollector == null)
- {
- _participantHealthCheckInfoCollector =
- new ParticipantHealthReportCollectorImpl(this, _instanceName);
- _participantHealthCheckInfoCollector.start();
- }
- // start the participant health check timer, also create zk path for health
- // check info
- String healthCheckInfoPath =
- _helixAccessor.keyBuilder().healthReports(_instanceName).getPath();
- if (!_zkClient.exists(healthCheckInfoPath))
- {
- _zkClient.createPersistent(healthCheckInfoPath, true);
- logger.info("Creating healthcheck info path " + healthCheckInfoPath);
- }
- }
-
- @Override
- public void addPreConnectCallback(PreConnectCallback callback)
- {
- logger.info("Adding preconnect callback");
- _preConnectCallbacks.add(callback);
- }
-
- private void resetHandlers()
- {
- synchronized (this)
- {
- // get a copy of the list and iterate over the copy list
- // in case handler.reset() will modify the original handler list
- List<CallbackHandler> handlers = new ArrayList<CallbackHandler>();
- handlers.addAll(_handlers);
-
- for (CallbackHandler handler : handlers)
- {
- handler.reset();
- logger.info("reset handler: " + handler.getPath() + " by "
- + Thread.currentThread().getName());
- }
- }
- }
-
- private void initHandlers()
- {
- // may add new currentState and message listeners during init()
- // so make a copy and iterate over the copy
- synchronized (this)
- {
- List<CallbackHandler> handlers = new ArrayList<CallbackHandler>();
- handlers.addAll(_handlers);
- for (CallbackHandler handler : handlers)
- {
- handler.init();
- }
- }
- }
-
- private void addListener(CallbackHandler handler)
- {
- synchronized (this)
- {
- _handlers.add(handler);
- logger.info("add handler: " + handler.getPath() + " by "
- + Thread.currentThread().getName());
- }
- }
-
- @Override
- public boolean isLeader()
- {
- if (!isConnected())
- {
- return false;
- }
-
- if (_instanceType != InstanceType.CONTROLLER)
- {
- return false;
- }
-
- Builder keyBuilder = _helixAccessor.keyBuilder();
- LiveInstance leader = _helixAccessor.getProperty(keyBuilder.controllerLeader());
- if (leader == null)
- {
- return false;
- }
- else
- {
- String leaderName = leader.getInstanceName();
- // TODO need check sessionId also, but in distributed mode, leader's
- // sessionId is
- // not equal to
- // the leader znode's sessionId field which is the sessionId of the
- // controller_participant that
- // successfully creates the leader node
- if (leaderName == null || !leaderName.equals(_instanceName))
- {
- return false;
- }
- }
- return true;
- }
-
- private void carryOverPreviousCurrentState()
- {
- Builder keyBuilder = _helixAccessor.keyBuilder();
-
- List<String> subPaths =
- _helixAccessor.getChildNames(keyBuilder.sessions(_instanceName));
- for (String previousSessionId : subPaths)
- {
- List<CurrentState> previousCurrentStates =
- _helixAccessor.getChildValues(keyBuilder.currentStates(_instanceName,
- previousSessionId));
-
- for (CurrentState previousCurrentState : previousCurrentStates)
- {
- if (!previousSessionId.equalsIgnoreCase(_sessionId))
- {
- logger.info("Carrying over old session:" + previousSessionId + " resource "
- + previousCurrentState.getId() + " to new session:" + _sessionId);
- String stateModelDefRef = previousCurrentState.getStateModelDefRef();
- if (stateModelDefRef == null)
- {
- logger.error("pervious current state doesn't have a state model def. skip it. prevCS: "
- + previousCurrentState);
- continue;
- }
- StateModelDefinition stateModel =
- _helixAccessor.getProperty(keyBuilder.stateModelDef(stateModelDefRef));
- for (String partitionName : previousCurrentState.getPartitionStateMap()
- .keySet())
- {
-
- previousCurrentState.setState(partitionName, stateModel.getInitialState());
- }
- previousCurrentState.setSessionId(_sessionId);
- _helixAccessor.setProperty(keyBuilder.currentState(_instanceName,
- _sessionId,
- previousCurrentState.getId()),
- previousCurrentState);
- }
- }
- }
- // Deleted old current state
- for (String previousSessionId : subPaths)
- {
- if (!previousSessionId.equalsIgnoreCase(_sessionId))
- {
- String path =
- _helixAccessor.keyBuilder()
- .currentStates(_instanceName, previousSessionId)
- .getPath();
- logger.info("Deleting previous current state. path: " + path + "/"
- + previousSessionId);
- _zkClient.deleteRecursive(path);
-
- }
- }
- }
-
- @Deprecated
- @Override
- public synchronized PropertyStore<ZNRecord> getPropertyStore()
- {
- checkConnected();
-
- if (_propertyStore == null)
- {
- String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
-
- // reuse the existing zkClient because its serializer will use raw serialization
- // for paths of the property store.
- _propertyStore =
- new ZKPropertyStore<ZNRecord>(_zkClient, new ZNRecordJsonSerializer(), path);
- }
-
- return _propertyStore;
- }
-
- @Override
- public synchronized ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore()
- {
- checkConnected();
-
- if (_helixPropertyStore == null)
- {
- String path =
- PropertyPathConfig.getPath(PropertyType.HELIX_PROPERTYSTORE, _clusterName);
-
- _helixPropertyStore =
- new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkClient),
- path,
- null);
- }
-
- return _helixPropertyStore;
- }
-
- @Override
- public synchronized HelixAdmin getClusterManagmentTool()
- {
- checkConnected();
- if (_zkClient != null)
- {
- _managementTool = new ZKHelixAdmin(_zkClient);
- }
- else
- {
- logger.error("Couldn't get ZKClusterManagementTool because zkClient is null");
- }
-
- return _managementTool;
- }
-
- @Override
- public ClusterMessagingService getMessagingService()
- {
- // The caller can register message handler factories on messaging service before the
- // helix manager is connected. Thus we do not do connected check here.
- return _messagingService;
- }
-
- @Override
- public ParticipantHealthReportCollector getHealthReportCollector()
- {
- checkConnected();
- return _participantHealthCheckInfoCollector;
- }
-
- @Override
- public InstanceType getInstanceType()
- {
- return _instanceType;
- }
-
- private void checkConnected()
- {
- if (!isConnected())
- {
- throw new HelixException("ClusterManager not connected. Call clusterManager.connect()");
- }
- }
-
- @Override
- public String getVersion()
- {
- return _version;
- }
-
- @Override
- public StateMachineEngine getStateMachineEngine()
- {
- return _stateMachEngine;
- }
-
- protected List<CallbackHandler> getHandlers()
- {
- return _handlers;
- }
-
- // TODO: rename this and not expose this function as part of interface
- @Override
- public void startTimerTasks()
- {
- for (HelixTimerTask task : _controllerTimerTasks)
- {
- task.start();
- }
- startStatusUpdatedumpTask();
- }
-
- @Override
- public void stopTimerTasks()
- {
- for (HelixTimerTask task : _controllerTimerTasks)
- {
- task.stop();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKUtil.java
deleted file mode 100644
index 14040c6..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKUtil.java
+++ /dev/null
@@ -1,353 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
-
-import com.linkedin.helix.ConfigScope.ConfigScopeProperty;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-
-public final class ZKUtil
-{
- private static Logger logger = Logger.getLogger(ZKUtil.class);
- private static int RETRYLIMIT = 3;
-
- private ZKUtil()
- {
- }
-
- public static boolean isClusterSetup(String clusterName, ZkClient zkClient)
- {
- if (clusterName == null || zkClient == null)
- {
- return false;
- }
-
- boolean isValid =
- zkClient.exists(PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName))
- && zkClient.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS,
- clusterName,
- ConfigScopeProperty.CLUSTER.toString(),
- clusterName))
- && zkClient.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS,
- clusterName,
- ConfigScopeProperty.PARTICIPANT.toString()))
- && zkClient.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS,
- clusterName,
- ConfigScopeProperty.RESOURCE.toString()))
- && zkClient.exists(PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE,
- clusterName))
- && zkClient.exists(PropertyPathConfig.getPath(PropertyType.LIVEINSTANCES,
- clusterName))
- && zkClient.exists(PropertyPathConfig.getPath(PropertyType.INSTANCES,
- clusterName))
- && zkClient.exists(PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW,
- clusterName))
- && zkClient.exists(PropertyPathConfig.getPath(PropertyType.CONTROLLER,
- clusterName))
- && zkClient.exists(PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS,
- clusterName))
- && zkClient.exists(PropertyPathConfig.getPath(PropertyType.MESSAGES_CONTROLLER,
- clusterName))
- && zkClient.exists(PropertyPathConfig.getPath(PropertyType.ERRORS_CONTROLLER,
- clusterName))
- && zkClient.exists(PropertyPathConfig.getPath(PropertyType.STATUSUPDATES_CONTROLLER,
- clusterName))
- && zkClient.exists(PropertyPathConfig.getPath(PropertyType.HISTORY,
- clusterName));
-
- return isValid;
- }
-
- public static void createChildren(ZkClient client,
- String parentPath,
- List<ZNRecord> list)
- {
- client.createPersistent(parentPath, true);
- if (list != null)
- {
- for (ZNRecord record : list)
- {
- createChildren(client, parentPath, record);
- }
- }
- }
-
- public static void createChildren(ZkClient client,
- String parentPath,
- ZNRecord nodeRecord)
- {
- client.createPersistent(parentPath, true);
-
- String id = nodeRecord.getId();
- String temp = parentPath + "/" + id;
- client.createPersistent(temp, nodeRecord);
- }
-
- public static void dropChildren(ZkClient client, String parentPath, List<ZNRecord> list)
- {
- // TODO: check if parentPath exists
- if (list != null)
- {
- for (ZNRecord record : list)
- {
- dropChildren(client, parentPath, record);
- }
- }
- }
-
- public static void dropChildren(ZkClient client, String parentPath, ZNRecord nodeRecord)
- {
- // TODO: check if parentPath exists
- String id = nodeRecord.getId();
- String temp = parentPath + "/" + id;
- client.deleteRecursive(temp);
- }
-
- public static List<ZNRecord> getChildren(ZkClient client, String path)
- {
- // parent watch will be set by zkClient
- List<String> children = client.getChildren(path);
- if (children == null || children.size() == 0)
- {
- return Collections.emptyList();
- }
-
- List<ZNRecord> childRecords = new ArrayList<ZNRecord>();
- for (String child : children)
- {
- String childPath = path + "/" + child;
- Stat newStat = new Stat();
- ZNRecord record = client.readDataAndStat(childPath, newStat, true);
- if (record != null)
- {
- record.setVersion(newStat.getVersion());
- record.setCreationTime(newStat.getCtime());
- record.setModifiedTime(newStat.getMtime());
- childRecords.add(record);
- }
- }
- return childRecords;
- }
-
- public static void updateIfExists(ZkClient client,
- String path,
- final ZNRecord record,
- boolean mergeOnUpdate)
- {
- if (client.exists(path))
- {
- DataUpdater<Object> updater = new DataUpdater<Object>()
- {
- @Override
- public Object update(Object currentData)
- {
- return record;
- }
- };
- client.updateDataSerialized(path, updater);
- }
- }
-
- public static void createOrUpdate(ZkClient client,
- String path,
- final ZNRecord record,
- final boolean persistent,
- final boolean mergeOnUpdate)
- {
- int retryCount = 0;
- while (retryCount < RETRYLIMIT)
- {
- try
- {
- if (client.exists(path))
- {
- DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>()
- {
- @Override
- public ZNRecord update(ZNRecord currentData)
- {
- if (currentData != null && mergeOnUpdate)
- {
- currentData.merge(record);
- return currentData;
- }
- return record;
- }
- };
- client.updateDataSerialized(path, updater);
- }
- else
- {
- CreateMode mode = (persistent) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
- if (record.getDeltaList().size() > 0)
- {
- ZNRecord value = new ZNRecord(record.getId());
- value.merge(record);
- client.create(path, value, mode);
- }
- else
- {
- client.create(path, record, mode);
- }
- }
- break;
- }
- catch (Exception e)
- {
- retryCount = retryCount + 1;
- logger.warn("Exception trying to update " + path + " Exception:" + e.getMessage()
- + ". Will retry.");
- }
- }
- }
-
- public static void asyncCreateOrUpdate(ZkClient client,
- String path,
- final ZNRecord record,
- final boolean persistent,
- final boolean mergeOnUpdate)
- {
- try
- {
- if (client.exists(path))
- {
- if (mergeOnUpdate)
- {
- ZNRecord curRecord = client.readData(path);
- if (curRecord != null)
- {
- curRecord.merge(record);
- client.asyncSetData(path, curRecord, -1, null);
- }
- else
- {
- client.asyncSetData(path, record, -1, null);
- }
- }
- else
- {
- client.asyncSetData(path, record, -1, null);
- }
- }
- else
- {
- CreateMode mode = (persistent) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
- if (record.getDeltaList().size() > 0)
- {
- ZNRecord newRecord = new ZNRecord(record.getId());
- newRecord.merge(record);
- client.create(path, null, mode);
-
- client.asyncSetData(path, newRecord, -1, null);
- }
- else
- {
- client.create(path, null, mode);
-
- client.asyncSetData(path, record, -1, null);
- }
- }
- }
- catch (Exception e)
- {
- logger.error("Exception in async create or update " + path + ". Exception: "
- + e.getMessage() + ". Give up.");
- }
- }
-
- public static void createOrReplace(ZkClient client,
- String path,
- final ZNRecord record,
- final boolean persistent)
- {
- int retryCount = 0;
- while (retryCount < RETRYLIMIT)
- {
- try
- {
- if (client.exists(path))
- {
- DataUpdater<Object> updater = new DataUpdater<Object>()
- {
- @Override
- public Object update(Object currentData)
- {
- return record;
- }
- };
- client.updateDataSerialized(path, updater);
- }
- else
- {
- CreateMode mode = (persistent) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
- client.create(path, record, mode);
- }
- break;
- }
- catch (Exception e)
- {
- retryCount = retryCount + 1;
- logger.warn("Exception trying to createOrReplace " + path + " Exception:"
- + e.getMessage() + ". Will retry.");
- }
- }
- }
-
- public static void subtract(ZkClient client,
- String path,
- final ZNRecord recordTosubtract)
- {
- int retryCount = 0;
- while (retryCount < RETRYLIMIT)
- {
- try
- {
- if (client.exists(path))
- {
- DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>()
- {
- @Override
- public ZNRecord update(ZNRecord currentData)
- {
- currentData.subtract(recordTosubtract);
- return currentData;
- }
- };
- client.updateDataSerialized(path, updater);
- break;
- }
- }
- catch (Exception e)
- {
- retryCount = retryCount + 1;
- logger.warn("Exception trying to createOrReplace " + path + " Exception:"
- + e.getMessage() + ". Will retry.");
- e.printStackTrace();
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZNRecordSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZNRecordSerializer.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZNRecordSerializer.java
deleted file mode 100644
index 7575d40..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZNRecordSerializer.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import java.io.ByteArrayInputStream;
-import java.io.StringWriter;
-import java.util.List;
-import java.util.Map;
-
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.DeserializationConfig;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.ZNRecord;
-
-public class ZNRecordSerializer implements ZkSerializer
-{
- private static Logger logger = Logger.getLogger(ZNRecordSerializer.class);
-
- private static int getListFieldBound(ZNRecord record)
- {
- int max = Integer.MAX_VALUE;
- if (record.getSimpleFields().containsKey(ZNRecord.LIST_FIELD_BOUND))
- {
- String maxStr = record.getSimpleField(ZNRecord.LIST_FIELD_BOUND);
- try
- {
- max = Integer.parseInt(maxStr);
- }
- catch (Exception e)
- {
- logger.error("IllegalNumberFormat for list field bound: " + maxStr);
- }
- }
- return max;
- }
-
- @Override
- public byte[] serialize(Object data)
- {
- if (!(data instanceof ZNRecord))
- {
- // null is NOT an instance of any class
- logger.error("Input object must be of type ZNRecord but it is " + data + ". Will not write to zk");
- throw new HelixException("Input object is not of type ZNRecord (was " + data + ")");
- }
-
- ZNRecord record = (ZNRecord) data;
-
- // apply retention policy
- int max = getListFieldBound(record);
- if (max < Integer.MAX_VALUE)
- {
- Map<String, List<String>> listMap = record.getListFields();
- for (String key : listMap.keySet())
- {
- List<String> list = listMap.get(key);
- if (list.size() > max)
- {
- listMap.put(key, list.subList(0, max));
- }
- }
- }
-
- // do serialization
- ObjectMapper mapper = new ObjectMapper();
- SerializationConfig serializationConfig = mapper.getSerializationConfig();
- serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
- serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
- serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
- StringWriter sw = new StringWriter();
- try
- {
- mapper.writeValue(sw, data);
- } catch (Exception e)
- {
- logger.error("Exception during data serialization. Will not write to zk. Data (first 1k): "
- + sw.toString().substring(0, 1024), e);
- throw new HelixException(e);
- }
-
- if (sw.toString().getBytes().length > ZNRecord.SIZE_LIMIT)
- {
- logger.error("Data size larger than 1M, ZNRecord.id: " + record.getId()
- + ". Will not write to zk. Data (first 1k): " + sw.toString().substring(0, 1024));
- throw new HelixException("Data size larger than 1M, ZNRecord.id: " + record.getId());
- }
- return sw.toString().getBytes();
- }
-
- @Override
- public Object deserialize(byte[] bytes)
- {
- if (bytes == null || bytes.length == 0)
- {
- // reading a parent/null node
- return null;
- }
-
- ObjectMapper mapper = new ObjectMapper();
- ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-
- DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
- deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
- deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
- deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
- try
- {
- ZNRecord zn = mapper.readValue(bais, ZNRecord.class);
- return zn;
- } catch (Exception e)
- {
- logger.error("Exception during deserialization of bytes: " + new String(bytes), e);
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZNRecordStreamingSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZNRecordStreamingSerializer.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZNRecordStreamingSerializer.java
deleted file mode 100644
index 32ede77..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZNRecordStreamingSerializer.java
+++ /dev/null
@@ -1,300 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.io.ByteArrayInputStream;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.I0Itec.zkclient.exception.ZkMarshallingError;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
-
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.ZNRecord;
-
-public class ZNRecordStreamingSerializer implements ZkSerializer
-{
- private static Logger LOG = Logger.getLogger(ZNRecordStreamingSerializer.class);
-
- private static int getListFieldBound(ZNRecord record)
- {
- int max = Integer.MAX_VALUE;
- if (record.getSimpleFields().containsKey(ZNRecord.LIST_FIELD_BOUND))
- {
- String maxStr = record.getSimpleField(ZNRecord.LIST_FIELD_BOUND);
- try
- {
- max = Integer.parseInt(maxStr);
- }
- catch (Exception e)
- {
- LOG.error("IllegalNumberFormat for list field bound: " + maxStr);
- }
- }
- return max;
- }
-
- @Override
- public byte[] serialize(Object data) throws ZkMarshallingError
- {
- if (!(data instanceof ZNRecord))
- {
- // null is NOT an instance of any class
- LOG.error("Input object must be of type ZNRecord but it is " + data + ". Will not write to zk");
- throw new HelixException("Input object is not of type ZNRecord (was " + data + ")");
- }
-
- // apply retention policy on list field
- ZNRecord record = (ZNRecord) data;
- int max = getListFieldBound(record);
- if (max < Integer.MAX_VALUE)
- {
- Map<String, List<String>> listMap = record.getListFields();
- for (String key : listMap.keySet())
- {
- List<String> list = listMap.get(key);
- if (list.size() > max)
- {
- listMap.put(key, list.subList(0, max));
- }
- }
- }
-
- StringWriter sw = new StringWriter();
- try
- {
- JsonFactory f = new JsonFactory();
- JsonGenerator g = f.createJsonGenerator(sw);
-
- g.writeStartObject();
-
- // write id field
- g.writeRaw("\n ");
- g.writeStringField("id", record.getId());
-
- // write simepleFields
- g.writeRaw("\n ");
- g.writeObjectFieldStart("simpleFields");
- for (String key : record.getSimpleFields().keySet())
- {
- g.writeRaw("\n ");
- g.writeStringField(key, record.getSimpleField(key));
- }
- g.writeRaw("\n ");
- g.writeEndObject(); // for simpleFields
-
- // write listFields
- g.writeRaw("\n ");
- g.writeObjectFieldStart("listFields");
- for (String key : record.getListFields().keySet())
- {
- // g.writeStringField(key, record.getListField(key).toString());
-
- // g.writeObjectFieldStart(key);
- g.writeRaw("\n ");
- g.writeArrayFieldStart(key);
- List<String> list = record.getListField(key);
- for (String listValue : list)
- {
- g.writeString(listValue);
- }
- // g.writeEndObject();
- g.writeEndArray();
-
- }
- g.writeRaw("\n ");
- g.writeEndObject(); // for listFields
-
- // write mapFields
- g.writeRaw("\n ");
- g.writeObjectFieldStart("mapFields");
- for (String key : record.getMapFields().keySet())
- {
- // g.writeStringField(key, record.getMapField(key).toString());
- g.writeRaw("\n ");
- g.writeObjectFieldStart(key);
- Map<String, String> map = record.getMapField(key);
- for (String mapKey : map.keySet())
- {
- g.writeRaw("\n ");
- g.writeStringField(mapKey, map.get(mapKey));
- }
- g.writeRaw("\n ");
- g.writeEndObject();
-
- }
- g.writeRaw("\n ");
- g.writeEndObject(); // for mapFields
-
- g.writeRaw("\n");
- g.writeEndObject(); // for whole znrecord
-
- // important: will force flushing of output, close underlying output
- // stream
- g.close();
- }
- catch (Exception e)
- {
- LOG.error("Exception during data serialization. Will not write to zk. Data (first 1k): "
- + sw.toString().substring(0, 1024), e);
- throw new HelixException(e);
- }
-
- // check size
- if (sw.toString().getBytes().length > ZNRecord.SIZE_LIMIT)
- {
- LOG.error("Data size larger than 1M, ZNRecord.id: " + record.getId()
- + ". Will not write to zk. Data (first 1k): " + sw.toString().substring(0, 1024));
- throw new HelixException("Data size larger than 1M, ZNRecord.id: " + record.getId());
- }
-
- return sw.toString().getBytes();
- }
-
- @Override
- public Object deserialize(byte[] bytes) throws ZkMarshallingError
- {
- if (bytes == null || bytes.length == 0)
- {
- LOG.error("ZNode is empty.");
- return null;
- }
-
- ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
- ZNRecord record = null;
-
- try
- {
- JsonFactory f = new JsonFactory();
- JsonParser jp = f.createJsonParser(bais);
-
- jp.nextToken(); // will return JsonToken.START_OBJECT (verify?)
- while (jp.nextToken() != JsonToken.END_OBJECT)
- {
- String fieldname = jp.getCurrentName();
- jp.nextToken(); // move to value, or START_OBJECT/START_ARRAY
- if ("id".equals(fieldname))
- {
- // contains an object
- record = new ZNRecord(jp.getText());
- }
- else if ("simpleFields".equals(fieldname))
- {
- while (jp.nextToken() != JsonToken.END_OBJECT)
- {
- String key = jp.getCurrentName();
- jp.nextToken(); // move to value
- record.setSimpleField(key, jp.getText());
- }
- }
- else if ("mapFields".equals(fieldname))
- {
- // user.setVerified(jp.getCurrentToken() == JsonToken.VALUE_TRUE);
- while (jp.nextToken() != JsonToken.END_OBJECT)
- {
- String key = jp.getCurrentName();
- record.setMapField(key, new TreeMap<String, String>());
- jp.nextToken(); // move to value
-
- while (jp.nextToken() != JsonToken.END_OBJECT)
- {
- String mapKey = jp.getCurrentName();
- jp.nextToken(); // move to value
- record.getMapField(key).put(mapKey, jp.getText());
- }
- }
-
- }
- else if ("listFields".equals(fieldname))
- {
- // user.setUserImage(jp.getBinaryValue());
- while (jp.nextToken() != JsonToken.END_OBJECT)
- {
- String key = jp.getCurrentName();
- record.setListField(key, new ArrayList<String>());
- jp.nextToken(); // move to value
- while (jp.nextToken() != JsonToken.END_ARRAY)
- {
- record.getListField(key).add(jp.getText());
- }
-
- }
-
- }
- else
- {
- throw new IllegalStateException("Unrecognized field '" + fieldname + "'!");
- }
- }
- jp.close(); // ensure resources get cleaned up timely and properly
- }
- catch (Exception e)
- {
- LOG.error("Exception during deserialization of bytes: " + new String(bytes), e);
- }
-
- return record;
- }
-
- public static void main(String[] args)
- {
- ZNRecord record = new ZNRecord("record");
- final int recordSize = 10;
- for (int i = 0; i < recordSize; i++)
- {
- record.setSimpleField("" + i, "" + i);
- record.setListField("" + i, new ArrayList<String>());
- for (int j = 0; j < recordSize; j++)
- {
- record.getListField("" + i).add("" + j);
- }
-
- record.setMapField("" + i, new TreeMap<String, String>());
- for (int j = 0; j < recordSize; j++)
- {
- record.getMapField("" + i).put("" + j, "" + j);
- }
- }
-
- ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
- byte[] bytes = serializer.serialize(record);
- System.out.println(new String(bytes));
- ZNRecord record2 = (ZNRecord) serializer.deserialize(bytes);
- System.out.println(record2);
-
- long start = System.currentTimeMillis();
- for (int i = 0; i < 100; i++)
- {
- bytes = serializer.serialize(record);
- // System.out.println(new String(bytes));
- record2 = (ZNRecord) serializer.deserialize(bytes);
- // System.out.println(record2);
- }
- long end = System.currentTimeMillis();
- System.out.println("ZNRecordStreamingSerializer time used: " + (end - start));
-
- ZNRecordSerializer serializer2 = new ZNRecordSerializer();
- bytes = serializer2.serialize(record);
- // System.out.println(new String(bytes));
- record2 = (ZNRecord) serializer2.deserialize(bytes);
- // System.out.println(record2);
-
- start = System.currentTimeMillis();
- for (int i = 0; i < 100; i++)
- {
- bytes = serializer2.serialize(record);
- // System.out.println(new String(bytes));
- record2 = (ZNRecord) serializer2.deserialize(bytes);
- // System.out.println(record2);
- }
- end = System.currentTimeMillis();
- System.out.println("ZNRecordSerializer time used: " + (end - start));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkAsyncCallbacks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkAsyncCallbacks.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkAsyncCallbacks.java
deleted file mode 100644
index 9d8d179..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkAsyncCallbacks.java
+++ /dev/null
@@ -1,171 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.AsyncCallback.DataCallback;
-import org.apache.zookeeper.AsyncCallback.StatCallback;
-import org.apache.zookeeper.AsyncCallback.StringCallback;
-import org.apache.zookeeper.AsyncCallback.VoidCallback;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.data.Stat;
-
-public class ZkAsyncCallbacks
-{
- private static Logger LOG = Logger.getLogger(ZkAsyncCallbacks.class);
-
- static class GetDataCallbackHandler extends DefaultCallback implements DataCallback
- {
- byte[] _data;
- Stat _stat;
-
- @Override
- public void handle()
- {
- // TODO Auto-generated method stub
- }
-
- @Override
- public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat)
- {
- if (rc == 0)
- {
- _data = data;
- _stat = stat;
- }
- callback(rc, path, ctx);
- }
- }
-
- static class SetDataCallbackHandler extends DefaultCallback implements StatCallback
- {
- Stat _stat;
-
- @Override
- public void handle()
- {
- // TODO Auto-generated method stub
- }
-
- @Override
- public void processResult(int rc, String path, Object ctx, Stat stat)
- {
- if (rc == 0)
- {
- _stat = stat;
- }
- callback(rc, path, ctx);
- }
-
- public Stat getStat()
- {
- return _stat;
- }
- }
-
- static class ExistsCallbackHandler extends DefaultCallback implements StatCallback
- {
- Stat _stat;
-
- @Override
- public void handle()
- {
- // TODO Auto-generated method stub
- }
-
- @Override
- public void processResult(int rc, String path, Object ctx, Stat stat)
- {
- if (rc == 0)
- {
- _stat = stat;
- }
- callback(rc, path, ctx);
- }
-
- }
-
- static class CreateCallbackHandler extends DefaultCallback implements StringCallback
- {
- @Override
- public void processResult(int rc, String path, Object ctx, String name)
- {
- callback(rc, path, ctx);
- }
-
- @Override
- public void handle()
- {
- // TODO Auto-generated method stub
- }
- }
-
- static class DeleteCallbackHandler extends DefaultCallback implements VoidCallback
- {
- @Override
- public void processResult(int rc, String path, Object ctx)
- {
- callback(rc, path, ctx);
- }
-
- @Override
- public void handle()
- {
- // TODO Auto-generated method stub
- }
-
- }
-
- /**
- * Default callback for zookeeper async api
- */
- static abstract class DefaultCallback
- {
- AtomicBoolean _lock = new AtomicBoolean(false);
- int _rc = -1;
-
- public void callback(int rc, String path, Object ctx)
- {
- if (rc != 0)
- {
- LOG.warn(this + ", rc:" + Code.get(rc) + ", path: " + path);
- }
- _rc = rc;
- handle();
-
- synchronized (_lock)
- {
- _lock.set(true);
- _lock.notify();
- }
- }
-
- public boolean waitForSuccess()
- {
- try
- {
- synchronized (_lock)
- {
- while (!_lock.get())
- {
- _lock.wait();
- }
- }
- }
- catch (InterruptedException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- return true;
- }
-
- public int getRc()
- {
- return _rc;
- }
-
- abstract public void handle();
- }
-
-}