You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:59 UTC

[42/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/EspressoStorageMockNode.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/EspressoStorageMockNode.java b/mockservice/src/main/java/org/apache/helix/EspressoStorageMockNode.java
new file mode 100644
index 0000000..d172d58
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/EspressoStorageMockNode.java
@@ -0,0 +1,194 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+//import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+//import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+//import java.util.concurrent.ConcurrentMap;
+
+import org.apache.helix.EspressoStorageMockStateModelFactory;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.healthcheck.PerformanceHealthReportProvider;
+import org.apache.helix.healthcheck.StatHealthReportProvider;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.log4j.Logger;
+
+public class EspressoStorageMockNode extends MockNode {
+
+	private static final Logger logger = Logger
+			.getLogger(EspressoStorageMockNode.class);
+
+	private final String GET_STAT_NAME = "get";
+	private final String SET_STAT_NAME = "set";
+	private final String COUNT_STAT_TYPE = "count";
+	private final String REPORT_NAME = "ParticipantStats";
+
+	StatHealthReportProvider _healthProvider;
+	//PerformanceHealthReportProvider _healthProvider;
+	EspressoStorageMockStateModelFactory _stateModelFactory;
+
+	HashSet<String>_partitions;
+
+	ConcurrentHashMap<String, String> _keyValueMap;
+	FnvHashFunction _hashFunction;
+	int _numTotalEspressoPartitions = 0;
+
+	public EspressoStorageMockNode(CMConnector cm) {
+		super(cm);
+		_stateModelFactory = new EspressoStorageMockStateModelFactory(0);
+
+//		StateMachineEngine genericStateMachineHandler = new StateMachineEngine();
+		StateMachineEngine stateMach = _cmConnector.getManager().getStateMachineEngine();
+		stateMach.registerStateModelFactory("MasterSlave", _stateModelFactory);
+//		_cmConnector
+//				.getManager()
+//				.getMessagingService()
+//				.registerMessageHandlerFactory(
+//						MessageType.STATE_TRANSITION.toString(),
+//						genericStateMachineHandler);
+        /*
+		_healthProvider = new StatHealthReportProvider();
+		_healthProvider.setReportName(REPORT_NAME);
+       */
+
+		_healthProvider = new StatHealthReportProvider();
+		//_healthProvider.setReportName(REPORT_NAME);
+
+		_cmConnector.getManager().getHealthReportCollector()
+				.addHealthReportProvider(_healthProvider);
+		_partitions = new HashSet<String>();
+		_keyValueMap = new ConcurrentHashMap<String, String>();
+		_hashFunction = new FnvHashFunction();
+
+		//start thread to keep checking what partitions this node owns
+		//Thread partitionGetter = new Thread(new PartitionGetterThread());
+		//partitionGetter.start();
+		//logger.debug("set partition getter thread to run");
+	}
+
+	public String formStatName(String dbName, String partitionName, String metricName)
+	{
+		String statName;
+		statName = "db"+dbName+".partition"+partitionName+"."+metricName;
+		return statName;
+
+	}
+
+	public String doGet(String dbId, String key) {
+		String partition = getPartitionName(dbId, getKeyPartition(dbId, key));
+		if (!isPartitionOwnedByNode(partition)) {
+			logger.error("Key "+key+" hashed to partition "+partition +" but this node does not own it.");
+			return null;
+		}
+
+		//_healthProvider.submitIncrementPartitionRequestCount(partition);
+		//_healthProvider.incrementPartitionStat(GET_STAT_NAME, partition);
+		_healthProvider.incrementStat(formStatName(dbId, partition, "getCount"), String.valueOf(System.currentTimeMillis()));
+		return _keyValueMap.get(key);
+	}
+
+	public void doPut(String dbId, String key, String value) {
+		String partition = getPartitionName(dbId, getKeyPartition(dbId, key));
+		if (!isPartitionOwnedByNode(partition)) {
+			logger.error("Key "+key+" hashed to partition "+partition +" but this node does not own it.");
+			return;
+		}
+
+		//_healthProvider.submitIncrementPartitionRequestCount(partition);
+		//_healthProvider.incrementPartitionStat(SET_STAT_NAME, partition);
+		//_healthProvider.incrementStat(SET_STAT_NAME, COUNT_STAT_TYPE,
+		//		dbId, partition, "FIXMENODENAME", String.valueOf(System.currentTimeMillis()));
+		_healthProvider.incrementStat(formStatName(dbId, partition, "putCount"), String.valueOf(System.currentTimeMillis()));
+
+		_keyValueMap.put(key, value);
+	}
+
+	private String getPartitionName(String databaseName, int partitionNum) {
+		return databaseName+"_"+partitionNum;
+	}
+
+	private boolean isPartitionOwnedByNode(String partitionName) {
+		Map<String, StateModel> stateModelMap = _stateModelFactory
+				.getStateModelMap();
+		logger.debug("state model map size: "+stateModelMap.size());
+
+		return (stateModelMap.keySet().contains(partitionName));
+	}
+
+	private int getKeyPartition(String dbName, String key) {
+		int numPartitions = getNumPartitions(dbName);
+		logger.debug("numPartitions: "+numPartitions);
+		int part = Math.abs((int)_hashFunction.hash(key.getBytes(), numPartitions));
+		logger.debug("part: "+part);
+		return part;
+	}
+
+	private int getNumPartitions(String dbName) {
+		logger.debug("dbName: "+dbName);
+		HelixDataAccessor helixDataAccessor = _cmConnector.getManager().getHelixDataAccessor();
+		Builder keyBuilder = helixDataAccessor.keyBuilder();
+    ZNRecord rec = helixDataAccessor.getProperty(keyBuilder.idealStates(dbName)).getRecord();
+		if (rec == null) {
+			logger.debug("rec is null");
+		}
+		IdealState state = new IdealState(rec);
+		return state.getNumPartitions();
+	}
+
+	class PartitionGetterThread implements Runnable {
+
+		@Override
+		public void run() {
+			while (true) {
+				synchronized (_partitions) {
+					//logger.debug("Building partition map");
+					_partitions.clear();
+					Map<String, StateModel> stateModelMap = _stateModelFactory
+							.getStateModelMap();
+					for (String s: stateModelMap.keySet()) {
+						logger.debug("adding key "+s);
+						_partitions.add(s);
+					}
+				}
+				//sleep for 60 seconds
+				try {
+					Thread.sleep(60000);
+				} catch (InterruptedException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			}
+		}
+	}
+
+	@Override
+	public void run() {
+
+	}
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/EspressoStorageMockStateModelFactory.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/EspressoStorageMockStateModelFactory.java b/mockservice/src/main/java/org/apache/helix/EspressoStorageMockStateModelFactory.java
new file mode 100644
index 0000000..2a807db
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/EspressoStorageMockStateModelFactory.java
@@ -0,0 +1,99 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+@SuppressWarnings("rawtypes")
+public class EspressoStorageMockStateModelFactory extends StateModelFactory<StateModel> {
+	int _delay;
+
+	public EspressoStorageMockStateModelFactory(int delay) {
+		_delay = delay;
+	}
+
+	@Override
+	public StateModel createNewStateModel(String stateUnitKey) {
+		EspressoStorageMockStateModel stateModel = new EspressoStorageMockStateModel();
+		stateModel.setDelay(_delay);
+		stateModel.setStateUnitKey(stateUnitKey);
+		return stateModel;
+	}
+
+	public static class EspressoStorageMockStateModel extends StateModel {
+		int _transDelay = 0;
+		String stateUnitKey;
+		
+		public String getStateUnitKey() {
+			return stateUnitKey;
+		}
+
+		public void setStateUnitKey(String stateUnitKey) {
+			this.stateUnitKey = stateUnitKey;
+		}
+
+		public void setDelay(int delay) {
+			_transDelay = delay > 0 ? delay : 0;
+		}
+
+		public void onBecomeSlaveFromOffline(Message message,
+				NotificationContext context) {
+
+			System.out.println("EspressoStorageMockStateModel.onBecomeSlaveFromOffline() for "+ stateUnitKey);
+			sleep();
+		}
+
+		private void sleep() {
+			try {
+				Thread.sleep(_transDelay);
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+
+		public void onBecomeSlaveFromMaster(Message message,
+				NotificationContext context) {
+			System.out.println("EspressoStorageMockStateModel.onBecomeSlaveFromMaster() for "+ stateUnitKey);
+			sleep();
+
+		}
+
+		public void onBecomeMasterFromSlave(Message message,
+				NotificationContext context) {
+			System.out.println("EspressoStorageMockStateModel.onBecomeMasterFromSlave() for "+ stateUnitKey);
+			sleep();
+
+		}
+
+		public void onBecomeOfflineFromSlave(Message message,
+				NotificationContext context) {
+			System.out.println("EspressoStorageMockStateModel.onBecomeOfflineFromSlave() for "+ stateUnitKey);
+			sleep();
+
+		}
+		
+		public void onBecomeDroppedFromOffline(Message message,
+        NotificationContext context) {
+      System.out.println("ObBecomeDroppedFromOffline() for "+ stateUnitKey);
+      sleep();
+
+    }
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/FnvHashFunction.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/FnvHashFunction.java b/mockservice/src/main/java/org/apache/helix/FnvHashFunction.java
new file mode 100644
index 0000000..4866a3f
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/FnvHashFunction.java
@@ -0,0 +1,201 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.nio.ByteBuffer;
+
+public class FnvHashFunction implements HashFunction
+{
+    private static final long FNV_BASIS = 0x811c9dc5;
+    private static final long FNV_PRIME = (1 << 24) + 0x193;
+
+    @Override
+    public long hash(ByteBuffer buf)
+    {
+    	int length = buf.position() + buf.remaining();
+        return hash(buf, 0, length);
+    }
+
+    @Override
+    public long hash(ByteBuffer buf, int off, int len)
+    {
+        long hash = FNV_BASIS;
+
+        int last = Math.min(off + len, buf.position() + buf.remaining());
+
+        for (int i=off;  i< last;  i++)
+        {
+            hash ^= 0xFF & buf.get(i);
+            hash *= FNV_PRIME;
+        }
+        return hash;
+    }
+
+	public long hash(byte[] key)
+	{
+        long hash = FNV_BASIS;
+        for(int i = 0; i < key.length; i++) {
+            hash ^= 0xFF & key[i];
+            hash *= FNV_PRIME;
+        }
+
+        return hash;
+	}
+
+	@Override
+	public long hash(byte[] key, int numBuckets)
+	{
+		return hash(key)%numBuckets;
+	}
+
+
+	private long hash(long val)
+	{
+	   long hashval = FNV_BASIS;
+
+	   for(int i = 0; i < 8; i++)
+	   {
+	     long octet = val & 0x00ff;
+	     val = val >> 8;
+
+	     hashval = hashval ^ octet;
+	     hashval = hashval * FNV_PRIME;
+	   }
+	   return (int)hashval;
+	}
+
+	@Override
+	public long hash(long val, int numBuckets)
+	{
+		return hash(val)%numBuckets;
+	}
+
+	/*
+	public static void main(String[] args)
+	{
+		byte[] b = new byte[1024*1024*100];
+		ByteBuffer buf = ByteBuffer.allocateDirect(1024*1024*100).order(DbusEvent.byteOrder);
+		Random r = new Random();
+		r.nextBytes(b);
+		buf.put(b);
+
+		FnvHashFunction fun = new FnvHashFunction();
+		CRC32 chksum = new CRC32();
+		JenkinsHashFunction jFun = new JenkinsHashFunction();
+
+		long start = 0;
+		long end = 0;
+		long hash = 0;
+		long diff = 0;
+		long delayMicro = 0;
+
+		chksum.reset();
+		chksum.update(b);
+		long prevhash = chksum.getValue();
+		for (int i = 0; i < 10; i++)
+		{
+			start = System.nanoTime();
+			chksum.reset();
+			chksum.update(b);
+			hash = chksum.getValue();
+			end = System.nanoTime();
+			assert(prevhash == hash);
+			diff += (end - start);
+		}
+
+		delayMicro = (diff/1000)/10;
+
+		System.out.println("Latency of System CRC32 (Micro Seconds) is: " + delayMicro);
+
+		prevhash = fun.hash(b);
+		for (int i = 0; i < 10; i++)
+		{
+			start = System.nanoTime();
+			hash = fun.hash(b);
+			end = System.nanoTime();
+			assert(prevhash == hash);
+			diff += (end - start);
+		}
+		delayMicro = (diff/1000)/10;
+		System.out.println("Latency of FNV (Micro Seconds)  is: " + delayMicro);
+
+		prevhash = jFun.hash(b);
+		for (int i = 0; i < 10; i++)
+		{
+			start = System.nanoTime();
+			hash = jFun.hash(b);
+			end = System.nanoTime();
+			assert(prevhash == hash);
+			diff += (end - start);
+		}
+		delayMicro = (diff/1000)/10;
+		System.out.println("Latency of Jenkins (Micro Seconds)  is: " + delayMicro);
+
+		prevhash = ByteBufferCRC32.getChecksum(b);
+		for (int i = 0; i < 10; i++)
+		{
+			start = System.nanoTime();
+			hash = ByteBufferCRC32.getChecksum(b);
+			end = System.nanoTime();
+			assert(prevhash == hash);
+			diff += (end - start);
+		}
+		delayMicro = (diff/1000)/10;
+		System.out.println("Latency of ByteBufferCRC32 (Micro Seconds)  is: " + delayMicro);
+
+		//System.out.println("Buffer position-Remaining :" + buf.position() + "-" + buf.remaining());
+
+		prevhash = fun.hash(buf);
+		for (int i = 0; i < 10; i++)
+		{
+			start = System.nanoTime();
+			hash = fun.hash(buf);
+			end = System.nanoTime();
+			assert(prevhash == hash);
+			diff += (end - start);
+		}
+		delayMicro = (diff/1000)/10;
+		System.out.println("Latency of FNV (Micro Seconds) for ByteBuffer is: " + delayMicro);
+		//System.out.println("Buffer position-Remaining :" + buf.position() + "-" + buf.remaining());
+
+		prevhash = fun.hash(buf);
+		for (int i = 0; i < 10; i++)
+		{
+			start = System.nanoTime();
+			hash = fun.hash(buf);
+			end = System.nanoTime();
+			assert(prevhash == hash);
+			diff += (end - start);
+		}
+		delayMicro = (diff/1000)/10;
+		System.out.println("Latency of Jenkins (Micro Seconds) for ByteBuffer is: " + delayMicro);
+		//System.out.println("Buffer position-Remaining :" + buf.position() + "-" + buf.remaining());
+		prevhash = ByteBufferCRC32.getChecksum(buf);
+		for (int i = 0; i < 10; i++)
+		{
+			start = System.nanoTime();
+			hash = ByteBufferCRC32.getChecksum(buf);
+			end = System.nanoTime();
+			assert(prevhash == hash);
+			diff += (end - start);
+		}
+		delayMicro = (diff/1000)/10;
+		System.out.println("Latency of ByteBufferCRC32 (Micro Seconds)  for ByteBuffer is: " + delayMicro);
+
+		//System.out.println("Buffer position-Remaining :" + buf.position() + "-" + buf.remaining());
+	}
+	*/
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/HashFunction.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/HashFunction.java b/mockservice/src/main/java/org/apache/helix/HashFunction.java
new file mode 100644
index 0000000..1641212
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/HashFunction.java
@@ -0,0 +1,68 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Forked from com.linkedin.databus.core.util @ r293057
+ * @author sdas
+ *
+ */
+public interface HashFunction {
+	
+
+	/*
+	 * Generates Hash for entire byte buffer
+	 * @param buf : ByteBuffer for which hash needs to be computed
+	 * @return hash value of buffer
+	 */
+	public long hash(ByteBuffer buf);
+	
+	/*
+	 * Generates Hash for a section of byte buffer denoted by its
+	 * endpoints
+	 * 
+	 * @param buf : ByteBuffer for which hash needs to be computed
+	 * @param off : Starting Offset
+	 * @param len : Length of the section for hash computation 
+	 * @return the hash value for the section of the buffer
+	 */
+	public long hash(ByteBuffer buf, int off, int len);
+	
+	/*
+	 * Generates hash for the byte array and bucketize the value to
+	 * 0.. (numBuckets - 1)
+	 * 
+	 * @param key : Array to apply hash and bucketize
+	 * @param numBuckets : Number of buckets for bucketization
+	 * 
+	 * @return Returns the bucket in the range 0..(numBuckets - 1)
+	 */
+	public long hash(byte[] key, int numBuckets);
+	
+	/*
+	 * Generates hash for the key and bucketize the value to
+	 * 0.. (numBuckets - 1)
+	 * 
+	 * @param key : Input key for which hash needs to be calculated
+	 * @param numBuckets : Number of buckets for bucketization
+	 * 
+	 * @return Returns the bucket in the range 0..(numBuckets - 1)
+	 */
+	public long hash(long key, int numBuckets);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/MockEspressoService.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/MockEspressoService.java b/mockservice/src/main/java/org/apache/helix/MockEspressoService.java
new file mode 100644
index 0000000..d934a44
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/MockEspressoService.java
@@ -0,0 +1,269 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.net.InetAddress;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.log4j.Logger;
+import org.restlet.Application;
+import org.restlet.Component;
+import org.restlet.Context;
+import org.restlet.data.Request;
+import org.restlet.Restlet;
+import org.restlet.data.MediaType;
+import org.restlet.data.Protocol;
+import org.restlet.resource.StringRepresentation;
+import org.restlet.Router;
+
+import org.restlet.data.Response;
+
+
+public class MockEspressoService extends Application
+{
+  private static final Logger logger = Logger.getLogger(MockEspressoService.class);
+	
+  public static final String HELP = "help";
+  public static final String CLUSTERNAME = "clusterName";
+  public static final String INSTANCENAME = "instanceName";
+  public static final String ZKSERVERADDRESS = "zkSvr";
+  public static final String PORT = "port";
+  public static final int DEFAULT_PORT = 8100;
+  protected static final String NODE_TYPE = "EspressoStorage";
+  //protected static final String INSTANCE_NAME = "localhost_1234";
+  
+  public static final String DATABASENAME = "database";
+  public static final String TABLENAME = "table";
+  public static final String RESOURCENAME = "resource";
+  public static final String SUBRESOURCENAME = "subresource";
+  public static final String STOPSERVICECOMMAND = "stopservice";
+  
+  public static final String CONTEXT_MOCK_NODE_NAME = "mocknode";
+  public static final String COMPONENT_NAME = "component";
+  
+  Context _applicationContext;
+  static int _serverPort;
+  static String _zkAddr = "localhost:9999";
+  static String _instanceName = "localhost";
+  static String _clusterName = "";
+  public CMConnector _connector;
+  public EspressoStorageMockNode _mockNode;
+  static Context _context;
+  static Component _component;
+
+  public MockEspressoService(Context context)
+  {
+	  super(_context);
+	  _connector = null;
+
+
+	  try {
+		  _connector = new CMConnector(_clusterName, _instanceName, _zkAddr); //, zkClient);
+	  }
+	  catch (Exception e) {
+		  logger.error("Unable to initialize CMConnector: "+e);
+		  e.printStackTrace();
+		  System.exit(-1);
+	  }
+	  _mockNode = (EspressoStorageMockNode)MockNodeFactory.createMockNode(NODE_TYPE, _connector);
+	  context.getAttributes().put(CONTEXT_MOCK_NODE_NAME, (Object)_mockNode);
+  }
+ 
+  @Override
+  public Restlet createRoot()
+  {
+	Router router = new Router(_context);
+
+    Restlet mainpage = new Restlet()
+    {
+      @Override
+      public void handle(Request request, Response response)
+      {
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append("<html>");
+        stringBuilder
+            .append("<head><title>Restlet Cluster Management page</title></head>");
+        stringBuilder.append("<body bgcolor=white>");
+        stringBuilder.append("<table border=\"0\">");
+        stringBuilder.append("<tr>");
+        stringBuilder.append("<td>");
+        stringBuilder.append("<h1>Rest cluster management interface V1</h1>");
+        stringBuilder.append("</td>");
+        stringBuilder.append("</tr>");
+        stringBuilder.append("</table>");
+        stringBuilder.append("</body>");
+        stringBuilder.append("</html>");
+        response.setEntity(new StringRepresentation(stringBuilder.toString(),
+            MediaType.TEXT_HTML));
+      }
+    };
+    
+    if (_mockNode == null) {
+    	logger.debug("_mockNode in createRoot is null");
+    }
+    router.attach("", mainpage);
+    
+    //Espresso handlers
+    router.attach("/{"+DATABASENAME+"}/{"+TABLENAME+"}/{"+RESOURCENAME+"}", EspressoResource.class);
+    router.attach("/{"+DATABASENAME+"}/{"+TABLENAME+"}/{"+RESOURCENAME+"}/{"+SUBRESOURCENAME+"}", EspressoResource.class);
+    
+    //Admin handlers
+    router.attach("/{"+STOPSERVICECOMMAND+"}", StopServiceResource.class);
+    
+    return router;
+  }
+  
+  public static void printUsage(Options cliOptions)
+  {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.printHelp("java " + MockEspressoService.class.getName(), cliOptions);
+  }
+
+  @SuppressWarnings("static-access")
+  private static Options constructCommandLineOptions()
+  {
+    Option helpOption = OptionBuilder.withLongOpt(HELP)
+        .withDescription("Prints command-line options info").create();
+    helpOption.setArgs(0);
+    helpOption.setRequired(false);
+    helpOption.setArgName("print help message");
+    
+    Option zkServerOption = OptionBuilder.withLongOpt(ZKSERVERADDRESS)
+        .withDescription("Provide zookeeper address").create();
+    zkServerOption.setArgs(1);
+    zkServerOption.setRequired(true);
+    zkServerOption.setArgName("ZookeeperServerAddress(Required)");
+    
+    Option clusterOption = OptionBuilder.withLongOpt(CLUSTERNAME)
+    		.withDescription("Provide cluster name").create();
+    clusterOption.setArgs(1);
+    clusterOption.setRequired(true);
+    clusterOption.setArgName("Cluster name(Required)");
+
+    Option instanceOption = OptionBuilder.withLongOpt(INSTANCENAME)
+    		.withDescription("Provide name for this instance").create();
+    instanceOption.setArgs(1);
+    instanceOption.setRequired(false);
+    instanceOption.setArgName("Instance name(Optional, defaults to localhost)");
+
+    Option portOption = OptionBuilder.withLongOpt(PORT)
+    .withDescription("Provide web service port").create();
+    portOption.setArgs(1);
+    portOption.setRequired(false);
+    portOption.setArgName("web service port, default: "+ DEFAULT_PORT);
+        
+    Options options = new Options();
+    options.addOption(helpOption);
+    options.addOption(zkServerOption);
+    options.addOption(clusterOption);
+    options.addOption(instanceOption);
+    options.addOption(portOption);
+    
+    return options;
+  }
+  
+  public static void processCommandLineArgs(String[] cliArgs) throws Exception
+  {
+    CommandLineParser cliParser = new GnuParser();
+    Options cliOptions = constructCommandLineOptions();
+    CommandLine cmd = null;
+
+    try
+    {
+      cmd = cliParser.parse(cliOptions, cliArgs);
+    } 
+    catch (ParseException pe)
+    {
+      System.err.println("MockEspressoService: failed to parse command-line options: "
+          + pe.toString());
+      printUsage(cliOptions);
+      System.exit(1);
+    }
+    _serverPort = DEFAULT_PORT;
+    if(cmd.hasOption(HELP))
+    {
+      printUsage(cliOptions);
+      return;
+    }
+    else if(cmd.hasOption(PORT))
+    {
+      _serverPort = Integer.parseInt(cmd.getOptionValue(PORT));
+    }
+    if (cmd.hasOption(ZKSERVERADDRESS)) {
+    	_zkAddr = cmd.getOptionValue(ZKSERVERADDRESS);
+    }
+    if (cmd.hasOption(CLUSTERNAME)) {
+    	_clusterName = cmd.getOptionValue(CLUSTERNAME);
+    	logger.debug("_clusterName: "+_clusterName);
+    }
+    if (cmd.hasOption(INSTANCENAME)) {
+    	_instanceName = cmd.getOptionValue(INSTANCENAME);
+    	_instanceName = _instanceName.replace(':', '_');
+    	logger.debug("_instanceName: "+_instanceName);
+    }
+  }
+  
+  public void run() throws Exception {
+	 
+	  logger.debug("Start of mock service run");
+	  
+	
+	  if (_mockNode == null) {
+		  logger.debug("_mockNode null");
+	  }
+	  else {
+		  logger.debug("_mockNode not null");
+	  }
+	  if (_mockNode != null) {
+		  // start web server with the zkServer address
+		  _component = new Component();
+		  _component.getServers().add(Protocol.HTTP, _serverPort);
+		  // Attach the application to the component and start it
+		  _component.getDefaultHost().attach(this); //(application);
+		  _context.getAttributes().put(COMPONENT_NAME, (Object)_component);
+		 // _context.getParameters().set("maxTotalConnections", "16",true); 
+		  _component.start();
+		  //start mock espresso node
+		  //!!!_mockNode.run();
+	  }
+	  else {
+		  logger.error("Unknown MockNode type "+NODE_TYPE);
+		  System.exit(-1);
+	  }
+	  logger.debug("mock service done");
+  }
+  
+  /**
+   * @param args
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception
+  {
+	  processCommandLineArgs(args);
+	  _context = new Context();
+	  MockEspressoService service = new MockEspressoService(_context);
+	  service.run();
+	  
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/MockNode.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/MockNode.java b/mockservice/src/main/java/org/apache/helix/MockNode.java
new file mode 100644
index 0000000..52c35d0
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/MockNode.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.log4j.Logger;
+
+public abstract class MockNode {
+	CMConnector _cmConnector;
+	
+	public MockNode(CMConnector cm) {
+		_cmConnector = cm;
+	}
+
+	public abstract void run();
+	
+	public void disconnect() {
+		_cmConnector.disconnect();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/MockNodeFactory.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/MockNodeFactory.java b/mockservice/src/main/java/org/apache/helix/MockNodeFactory.java
new file mode 100644
index 0000000..d428dae
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/MockNodeFactory.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.log4j.Logger;
+
+public class MockNodeFactory {
+	
+	private static final Logger logger = Logger.getLogger(MockNodeFactory.class);
+	
+	public MockNodeFactory()
+	{
+	}
+	
+	public static MockNode createMockNode(String type, CMConnector cm) {
+		if (type.equals("EspressoStorage")) {
+			return new EspressoStorageMockNode(cm);
+		}
+		else {
+			logger.error("Unknown MockNode type "+type);
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/MockRunner.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/MockRunner.java b/mockservice/src/main/java/org/apache/helix/MockRunner.java
new file mode 100644
index 0000000..cf9615e
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/MockRunner.java
@@ -0,0 +1,58 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Hello world!
+ *
+ */
+public class MockRunner 
+{
+	
+	private static final Logger logger = Logger.getLogger(MockRunner.class);
+	
+	protected static final String nodeType = "EspressoStorage";
+	protected static final String ZK_ADDR = "localhost:2184";
+	protected static final String INSTANCE_NAME = "localhost_1234";
+	protected static final String CLUSTER_NAME = "MockCluster";
+	
+    public static void main( String[] args )
+    {
+    	//ZkClient zkClient = new ZkClient(ZK_ADDR, 3000, 10000, new ZNRecordSerializer());
+    	CMConnector cm = null;
+    	try {
+    		cm = new CMConnector(CLUSTER_NAME, INSTANCE_NAME, ZK_ADDR); //, zkClient);
+    	}
+    	catch (Exception e) {
+    		logger.error("Unable to initialize CMConnector: "+e);
+    		e.printStackTrace();
+    		System.exit(-1);
+    	}
+        MockNode mock = MockNodeFactory.createMockNode(nodeType, cm);
+        if (mock != null) {
+        	mock.run();
+        }
+        else {
+        	logger.error("Unknown MockNode type "+nodeType);
+        	System.exit(-1);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/StopServiceResource.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/StopServiceResource.java b/mockservice/src/main/java/org/apache/helix/StopServiceResource.java
new file mode 100644
index 0000000..2f8d202
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/StopServiceResource.java
@@ -0,0 +1,141 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.util.List;
+
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.restlet.Component;
+import org.restlet.Context;
+import org.restlet.data.MediaType;
+import org.restlet.data.Method;
+import org.restlet.data.Request;
+import org.restlet.data.Response;
+import org.restlet.data.Status;
+import org.restlet.resource.Representation;
+import org.restlet.resource.Resource;
+import org.restlet.resource.ResourceException;
+import org.restlet.resource.StringRepresentation;
+import org.restlet.resource.Variant;
+
+
+public class StopServiceResource extends Resource {
+
+
+	private static final Logger logger = Logger
+			.getLogger(StopServiceResource.class);
+
+	Context _context;
+
+	public StopServiceResource(Context context,
+			Request request,
+			Response response) 
+	{
+		super(context, request, response);
+		getVariants().add(new Variant(MediaType.TEXT_PLAIN));
+		getVariants().add(new Variant(MediaType.APPLICATION_JSON));
+		_context = context;
+	}
+
+	public boolean allowGet()
+	{
+		System.out.println("PutResource.allowGet()");
+		return true;
+	}
+
+	public boolean allowPost()
+	{
+		System.out.println("PutResource.allowPost()");
+		return false;
+	}
+
+	public boolean allowPut()
+	{
+		System.out.println("PutResource.allowPut()");
+		return false;
+	}
+
+	public boolean allowDelete()
+	{
+		return false;
+	}
+
+
+	class StopThread implements Runnable {
+
+		Component _component;
+		MockNode _mockNode;
+		
+		StopThread(Component c, MockNode m) {
+			_component = c;
+			_mockNode = m;
+		}
+		
+		@Override
+		public void run() {
+			try {
+				//sleep for 1 second, then end service
+				Thread.sleep(1000);
+				_component.stop();
+				_mockNode.disconnect();
+				System.exit(0);
+			} catch (Exception e) {
+				logger.error("Unable to stop service: "+e);
+				e.printStackTrace();
+			}
+		}
+		
+	}
+	
+	//XXX: handling both gets and puts here for now
+	public Representation represent(Variant variant)
+	{
+		System.out.println("StopServiceResource.represent()");
+		StringRepresentation presentation = null;
+		try
+		{
+			logger.debug("in represent, stopping service");
+			Component component = (Component)_context.getAttributes().get(MockEspressoService.COMPONENT_NAME);
+			EspressoStorageMockNode mock = (EspressoStorageMockNode)_context.getAttributes().get(MockEspressoService.CONTEXT_MOCK_NODE_NAME);
+			presentation = new StringRepresentation("Stopping in 1 second", MediaType.APPLICATION_JSON);
+			Thread stopper = new Thread(new StopThread(component, mock));
+			stopper.start();
+		}
+
+		catch(Exception e)
+		{
+			String error = "Error shutting down";
+			presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON);	      
+			e.printStackTrace();
+		}  
+		return presentation;
+	}
+
+	public void storeRepresentation(Representation entity) throws ResourceException {
+
+	}
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/test/java/org/apache/helix/AppTest.java
----------------------------------------------------------------------
diff --git a/mockservice/src/test/java/org/apache/helix/AppTest.java b/mockservice/src/test/java/org/apache/helix/AppTest.java
new file mode 100644
index 0000000..8efe7b4
--- /dev/null
+++ b/mockservice/src/test/java/org/apache/helix/AppTest.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.testng.annotations.Test;
+import org.testng.AssertJUnit;
+
+/**
+ * Unit test for simple App.
+ */
+public class AppTest
+{
+    /**
+     * Create the test case
+     *
+     * @param testName name of the test case
+     */
+    public AppTest( String testName )
+    {
+    }
+
+    /**
+     * Rigourous Test :-)
+     */
+    @Test
+    public void testApp()
+    {
+        AssertJUnit.assertTrue( true );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
new file mode 100644
index 0000000..73e7a75
--- /dev/null
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
@@ -0,0 +1,130 @@
+package org.apache.helix.recipes.rabbitmq;
+
+import java.util.List;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+
+public class Consumer
+{
+  private final String _zkAddr;
+  private final String _clusterName;
+  private final String _consumerId;
+  private final String _mqServer;
+  private HelixManager _manager = null;
+
+  public Consumer(String zkAddr, String clusterName, String consumerId, String mqServer)
+  {
+    _zkAddr = zkAddr;
+    _clusterName = clusterName;
+    _consumerId = consumerId;
+    _mqServer = mqServer;
+  }
+
+  public void connect()
+  {
+    try
+    {
+      _manager =
+          HelixManagerFactory.getZKHelixManager(_clusterName,
+                                                _consumerId,
+                                                InstanceType.PARTICIPANT,
+                                                _zkAddr);
+
+      StateMachineEngine stateMach = _manager.getStateMachineEngine();
+      ConsumerStateModelFactory modelFactory =
+          new ConsumerStateModelFactory(_consumerId, _mqServer);
+      stateMach.registerStateModelFactory(SetupConsumerCluster.DEFAULT_STATE_MODEL, modelFactory);
+
+      _manager.connect();
+
+      Thread.currentThread().join();
+    }
+    catch (InterruptedException e)
+    {
+      System.err.println(" [-] " + _consumerId + " is interrupted ...");
+    }
+    catch (Exception e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    finally
+    {
+      disconnect();
+    }
+  }
+
+  public void disconnect()
+  {
+    if (_manager != null)
+    {
+      _manager.disconnect();
+    }
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    if (args.length < 3)
+    {
+      System.err.println("USAGE: java Consumer zookeeperAddress (e.g. localhost:2181) consumerId (0-2), rabbitmqServer (e.g. localhost)");
+      System.exit(1);
+    }
+
+    final String zkAddr = args[0];
+    final String clusterName = SetupConsumerCluster.DEFAULT_CLUSTER_NAME;
+    final String consumerId = args[1];
+    final String mqServer = args[2];
+
+    ZkClient zkclient = null;
+    try
+    {
+      // add node to cluster if not already added
+      zkclient =
+          new ZkClient(zkAddr,
+                       ZkClient.DEFAULT_SESSION_TIMEOUT,
+                       ZkClient.DEFAULT_CONNECTION_TIMEOUT,
+                       new ZNRecordSerializer());
+      ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
+
+      List<String> nodes = admin.getInstancesInCluster(clusterName);
+      if (!nodes.contains("consumer_" + consumerId))
+      {
+        InstanceConfig config = new InstanceConfig("consumer_" + consumerId);
+        config.setHostName("localhost");
+        config.setInstanceEnabled(true);
+        admin.addInstance(clusterName, config);
+      }
+
+      // start consumer
+      final Consumer consumer =
+          new Consumer(zkAddr, clusterName, "consumer_" + consumerId, mqServer);
+
+      Runtime.getRuntime().addShutdownHook(new Thread()
+      {
+        @Override
+        public void run()
+        {
+          System.out.println("Shutting down consumer_" + consumerId);
+          consumer.disconnect();
+        }
+      });
+
+      consumer.connect();
+    }
+    finally
+    {
+      if (zkclient != null)
+      {
+        zkclient.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModel.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModel.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModel.java
new file mode 100644
index 0000000..27d935e
--- /dev/null
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModel.java
@@ -0,0 +1,97 @@
+package org.apache.helix.recipes.rabbitmq;
+
+import org.apache.log4j.Logger;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+
+@StateModelInfo(initialState = "OFFLINE", states = { "ONLINE", "ERROR" })
+public class ConsumerStateModel extends StateModel
+{
+  private static Logger LOG = Logger.getLogger(ConsumerStateModel.class);
+
+  private final String _consumerId;
+  private final String _partition;
+
+  private final String _mqServer;
+  private ConsumerThread _thread = null;
+
+  public ConsumerStateModel(String consumerId, String partition, String mqServer)
+  {
+    _partition = partition;
+    _consumerId = consumerId;
+    _mqServer = mqServer;
+  }
+
+  @Transition(to = "ONLINE", from = "OFFLINE")
+  public void onBecomeOnlineFromOffline(Message message, NotificationContext context)
+  {
+    LOG.debug(_consumerId + " becomes ONLINE from OFFLINE for " + _partition);
+
+    if (_thread == null)
+    {
+      LOG.debug("Starting ConsumerThread for " + _partition + "...");
+      _thread = new ConsumerThread(_partition, _mqServer, _consumerId);
+      _thread.start();
+      LOG.debug("Starting ConsumerThread for " + _partition + " done");
+
+    }
+  }
+
+  @Transition(to = "OFFLINE", from = "ONLINE")
+  public void onBecomeOfflineFromOnline(Message message, NotificationContext context)
+      throws InterruptedException
+  {
+    LOG.debug(_consumerId + " becomes OFFLINE from ONLINE for " + _partition);
+
+    if (_thread != null)
+    {
+      LOG.debug("Stopping " + _consumerId + " for " + _partition + "...");
+
+      _thread.interrupt();
+      _thread.join(2000);
+      _thread = null;
+      LOG.debug("Stopping " +  _consumerId + " for " + _partition + " done");
+
+    }
+  }
+
+  @Transition(to = "DROPPED", from = "OFFLINE")
+  public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
+  {
+    LOG.debug(_consumerId + " becomes DROPPED from OFFLINE for " + _partition);
+  }
+
+  @Transition(to = "OFFLINE", from = "ERROR")
+  public void onBecomeOfflineFromError(Message message, NotificationContext context)
+  {
+    LOG.debug(_consumerId + " becomes OFFLINE from ERROR for " + _partition);
+  }
+
+  @Override
+  public void reset()
+  {
+    LOG.warn("Default reset() invoked");
+    
+    if (_thread != null)
+    {
+      LOG.debug("Stopping " + _consumerId + " for " + _partition + "...");
+
+      _thread.interrupt();
+      try
+      {
+        _thread.join(2000);
+      } catch (InterruptedException e)
+      {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+      _thread = null;
+      LOG.debug("Stopping " +  _consumerId + " for " + _partition + " done");
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java
new file mode 100644
index 0000000..673c7fd
--- /dev/null
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java
@@ -0,0 +1,21 @@
+package org.apache.helix.recipes.rabbitmq;
+
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+public class ConsumerStateModelFactory extends StateModelFactory<ConsumerStateModel>
+{
+  private final String _consumerId;
+  private final String _mqServer;
+  public ConsumerStateModelFactory(String consumerId, String msServer)
+  {
+    _consumerId = consumerId;
+    _mqServer = msServer;
+  }
+  
+  @Override
+  public ConsumerStateModel createNewStateModel(String partition)
+  {
+    ConsumerStateModel model = new ConsumerStateModel(_consumerId, partition, _mqServer);
+    return model;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerThread.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerThread.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerThread.java
new file mode 100644
index 0000000..0a6ed5c
--- /dev/null
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerThread.java
@@ -0,0 +1,76 @@
+package org.apache.helix.recipes.rabbitmq;
+
+import java.io.IOException;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.QueueingConsumer;
+
+public class ConsumerThread extends Thread
+{
+  private static final String EXCHANGE_NAME = "topic_logs";
+  private final String _partition;
+  private final String _mqServer;
+  private final String _consumerId;
+  
+  public ConsumerThread(String partition, String mqServer, String consumerId)
+  {
+    _partition = partition;
+    _mqServer = mqServer;
+    _consumerId = consumerId;
+  }
+
+  @Override
+  public void run()
+  {
+    Connection connection = null;
+    try
+    {
+      ConnectionFactory factory = new ConnectionFactory();
+      factory.setHost(_mqServer);
+      connection = factory.newConnection();
+      Channel channel = connection.createChannel();
+
+      channel.exchangeDeclare(EXCHANGE_NAME, "topic");
+      String queueName = channel.queueDeclare().getQueue();
+
+      String bindingKey = _partition;
+      channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
+
+      System.out.println(" [*] " + _consumerId + " Waiting for messages on " + bindingKey + ". To exit press CTRL+C");
+
+      QueueingConsumer consumer = new QueueingConsumer(channel);
+      channel.basicConsume(queueName, true, consumer);
+
+      while (true)
+      {
+        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
+        String message = new String(delivery.getBody());
+        String routingKey = delivery.getEnvelope().getRoutingKey();
+
+        System.out.println(" [x] " + _consumerId + " Received '" + routingKey + "':'" + message + "'");
+      }
+    } catch (InterruptedException e)
+    {
+      System.err.println(" [-] " + _consumerId + " on " + _partition + " is interrupted ...");
+    }
+    catch (Exception e)
+    {
+      e.printStackTrace();
+    } finally
+    {
+      if (connection != null)
+      {
+        try
+        {
+          connection.close();
+        } catch (IOException e)
+        {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Emitter.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Emitter.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Emitter.java
new file mode 100644
index 0000000..a91523a
--- /dev/null
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Emitter.java
@@ -0,0 +1,56 @@
+package org.apache.helix.recipes.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+public class Emitter
+{
+
+  private static final String EXCHANGE_NAME = "topic_logs";
+
+  public static void main(String[] args) throws Exception
+  {
+    if (args.length < 1)
+    {
+      System.err.println("USAGE: java Emitter rabbitmqServer (e.g. localhost) numberOfMessage (optional)");
+      System.exit(1);
+    }
+    
+    final String mqServer = args[0];  // "zzhang-ld";
+    int count = Integer.MAX_VALUE;
+    if (args.length > 1)
+    {
+      try
+      {
+        count = Integer.parseInt(args[1]);
+      } catch (Exception e) {
+        // TODO: handle exception
+      }
+    }
+    System.out.println("Sending " + count + " messages with random topic id");
+    
+
+    ConnectionFactory factory = new ConnectionFactory();
+    factory.setHost(mqServer);
+    Connection connection = factory.newConnection();
+    Channel channel = connection.createChannel();
+
+    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
+
+    for (int i = 0; i < count; i++)
+    {
+      int rand = ((int) (Math.random() * 10000) % SetupConsumerCluster.DEFAULT_PARTITION_NUMBER);
+      String routingKey = "topic_" + rand;
+      String message = "message_" + rand;
+
+      channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
+      System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
+      
+      Thread.sleep(1000);
+    }
+    
+    connection.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/SetupConsumerCluster.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/SetupConsumerCluster.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/SetupConsumerCluster.java
new file mode 100644
index 0000000..fa01af4
--- /dev/null
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/SetupConsumerCluster.java
@@ -0,0 +1,58 @@
+package org.apache.helix.recipes.rabbitmq;
+
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
+
+public class SetupConsumerCluster
+{
+  public static final String DEFAULT_CLUSTER_NAME = "rabbitmq-consumer-cluster";
+  public static final String DEFAULT_RESOURCE_NAME = "topic";
+  public static final int DEFAULT_PARTITION_NUMBER = 6;
+  public static final String DEFAULT_STATE_MODEL = "OnlineOffline";
+
+  public static void main(String[] args)
+  {
+    if (args.length < 1)
+    {
+      System.err.println("USAGE: java SetupConsumerCluster zookeeperAddress (e.g. localhost:2181)");
+      System.exit(1);
+    }
+
+    final String zkAddr = args[0];
+    final String clusterName = DEFAULT_CLUSTER_NAME;
+
+    ZkClient zkclient = null;
+    try
+    {
+      zkclient = new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
+          ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+      ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
+
+      // add cluster
+      admin.addCluster(clusterName, true);
+
+      // add state model definition
+      StateModelConfigGenerator generator = new StateModelConfigGenerator();
+      admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL,
+          new StateModelDefinition(generator.generateConfigForOnlineOffline()));
+
+      // add resource "topic" which has 6 partitions
+      String resourceName = DEFAULT_RESOURCE_NAME;
+      admin.addResource(clusterName, resourceName, DEFAULT_PARTITION_NUMBER, DEFAULT_STATE_MODEL,
+          IdealStateModeProperty.AUTO_REBALANCE.toString());
+      
+      admin.rebalance(clusterName, resourceName, 1);
+
+    } finally
+    {
+      if (zkclient != null)
+      {
+        zkclient.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/StartClusterManager.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/StartClusterManager.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/StartClusterManager.java
new file mode 100644
index 0000000..5f43b53
--- /dev/null
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/StartClusterManager.java
@@ -0,0 +1,42 @@
+package org.apache.helix.recipes.rabbitmq;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.HelixControllerMain;
+
+public class StartClusterManager
+{
+  public static void main(String[] args)
+  {
+    if (args.length < 1)
+    {
+      System.err.println("USAGE: java StartClusterManager zookeeperAddress (e.g. localhost:2181)");
+      System.exit(1);
+    }
+    
+    final String clusterName = SetupConsumerCluster.DEFAULT_CLUSTER_NAME;
+    final String zkAddr = args[0];
+    
+    try
+    {
+      final HelixManager manager = HelixControllerMain.startHelixController(zkAddr, clusterName, null,
+                                                        HelixControllerMain.STANDALONE);
+      
+      Runtime.getRuntime().addShutdownHook(new Thread()
+      {
+        @Override
+        public void run()
+        {
+          System.out.println("Shutting down cluster manager: " + manager.getInstanceName());
+          manager.disconnect();
+        }
+      });
+      
+      Thread.currentThread().join();
+    }
+    catch (Exception e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+}