You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:59 UTC
[42/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/EspressoStorageMockNode.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/EspressoStorageMockNode.java b/mockservice/src/main/java/org/apache/helix/EspressoStorageMockNode.java
new file mode 100644
index 0000000..d172d58
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/EspressoStorageMockNode.java
@@ -0,0 +1,194 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+//import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+//import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+//import java.util.concurrent.ConcurrentMap;
+
+import org.apache.helix.EspressoStorageMockStateModelFactory;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.healthcheck.PerformanceHealthReportProvider;
+import org.apache.helix.healthcheck.StatHealthReportProvider;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.log4j.Logger;
+
+public class EspressoStorageMockNode extends MockNode {
+
+ private static final Logger logger = Logger
+ .getLogger(EspressoStorageMockNode.class);
+
+ private final String GET_STAT_NAME = "get";
+ private final String SET_STAT_NAME = "set";
+ private final String COUNT_STAT_TYPE = "count";
+ private final String REPORT_NAME = "ParticipantStats";
+
+ StatHealthReportProvider _healthProvider;
+ //PerformanceHealthReportProvider _healthProvider;
+ EspressoStorageMockStateModelFactory _stateModelFactory;
+
+ HashSet<String>_partitions;
+
+ ConcurrentHashMap<String, String> _keyValueMap;
+ FnvHashFunction _hashFunction;
+ int _numTotalEspressoPartitions = 0;
+
+ public EspressoStorageMockNode(CMConnector cm) {
+ super(cm);
+ _stateModelFactory = new EspressoStorageMockStateModelFactory(0);
+
+// StateMachineEngine genericStateMachineHandler = new StateMachineEngine();
+ StateMachineEngine stateMach = _cmConnector.getManager().getStateMachineEngine();
+ stateMach.registerStateModelFactory("MasterSlave", _stateModelFactory);
+// _cmConnector
+// .getManager()
+// .getMessagingService()
+// .registerMessageHandlerFactory(
+// MessageType.STATE_TRANSITION.toString(),
+// genericStateMachineHandler);
+ /*
+ _healthProvider = new StatHealthReportProvider();
+ _healthProvider.setReportName(REPORT_NAME);
+ */
+
+ _healthProvider = new StatHealthReportProvider();
+ //_healthProvider.setReportName(REPORT_NAME);
+
+ _cmConnector.getManager().getHealthReportCollector()
+ .addHealthReportProvider(_healthProvider);
+ _partitions = new HashSet<String>();
+ _keyValueMap = new ConcurrentHashMap<String, String>();
+ _hashFunction = new FnvHashFunction();
+
+ //start thread to keep checking what partitions this node owns
+ //Thread partitionGetter = new Thread(new PartitionGetterThread());
+ //partitionGetter.start();
+ //logger.debug("set partition getter thread to run");
+ }
+
+ public String formStatName(String dbName, String partitionName, String metricName)
+ {
+ String statName;
+ statName = "db"+dbName+".partition"+partitionName+"."+metricName;
+ return statName;
+
+ }
+
+ public String doGet(String dbId, String key) {
+ String partition = getPartitionName(dbId, getKeyPartition(dbId, key));
+ if (!isPartitionOwnedByNode(partition)) {
+ logger.error("Key "+key+" hashed to partition "+partition +" but this node does not own it.");
+ return null;
+ }
+
+ //_healthProvider.submitIncrementPartitionRequestCount(partition);
+ //_healthProvider.incrementPartitionStat(GET_STAT_NAME, partition);
+ _healthProvider.incrementStat(formStatName(dbId, partition, "getCount"), String.valueOf(System.currentTimeMillis()));
+ return _keyValueMap.get(key);
+ }
+
+ public void doPut(String dbId, String key, String value) {
+ String partition = getPartitionName(dbId, getKeyPartition(dbId, key));
+ if (!isPartitionOwnedByNode(partition)) {
+ logger.error("Key "+key+" hashed to partition "+partition +" but this node does not own it.");
+ return;
+ }
+
+ //_healthProvider.submitIncrementPartitionRequestCount(partition);
+ //_healthProvider.incrementPartitionStat(SET_STAT_NAME, partition);
+ //_healthProvider.incrementStat(SET_STAT_NAME, COUNT_STAT_TYPE,
+ // dbId, partition, "FIXMENODENAME", String.valueOf(System.currentTimeMillis()));
+ _healthProvider.incrementStat(formStatName(dbId, partition, "putCount"), String.valueOf(System.currentTimeMillis()));
+
+ _keyValueMap.put(key, value);
+ }
+
+ private String getPartitionName(String databaseName, int partitionNum) {
+ return databaseName+"_"+partitionNum;
+ }
+
+ private boolean isPartitionOwnedByNode(String partitionName) {
+ Map<String, StateModel> stateModelMap = _stateModelFactory
+ .getStateModelMap();
+ logger.debug("state model map size: "+stateModelMap.size());
+
+ return (stateModelMap.keySet().contains(partitionName));
+ }
+
+ private int getKeyPartition(String dbName, String key) {
+ int numPartitions = getNumPartitions(dbName);
+ logger.debug("numPartitions: "+numPartitions);
+ int part = Math.abs((int)_hashFunction.hash(key.getBytes(), numPartitions));
+ logger.debug("part: "+part);
+ return part;
+ }
+
+ private int getNumPartitions(String dbName) {
+ logger.debug("dbName: "+dbName);
+ HelixDataAccessor helixDataAccessor = _cmConnector.getManager().getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+ ZNRecord rec = helixDataAccessor.getProperty(keyBuilder.idealStates(dbName)).getRecord();
+ if (rec == null) {
+ logger.debug("rec is null");
+ }
+ IdealState state = new IdealState(rec);
+ return state.getNumPartitions();
+ }
+
+ class PartitionGetterThread implements Runnable {
+
+ @Override
+ public void run() {
+ while (true) {
+ synchronized (_partitions) {
+ //logger.debug("Building partition map");
+ _partitions.clear();
+ Map<String, StateModel> stateModelMap = _stateModelFactory
+ .getStateModelMap();
+ for (String s: stateModelMap.keySet()) {
+ logger.debug("adding key "+s);
+ _partitions.add(s);
+ }
+ }
+ //sleep for 60 seconds
+ try {
+ Thread.sleep(60000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+
+ }
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/EspressoStorageMockStateModelFactory.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/EspressoStorageMockStateModelFactory.java b/mockservice/src/main/java/org/apache/helix/EspressoStorageMockStateModelFactory.java
new file mode 100644
index 0000000..2a807db
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/EspressoStorageMockStateModelFactory.java
@@ -0,0 +1,99 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+@SuppressWarnings("rawtypes")
+public class EspressoStorageMockStateModelFactory extends StateModelFactory<StateModel> {
+ int _delay;
+
+ public EspressoStorageMockStateModelFactory(int delay) {
+ _delay = delay;
+ }
+
+ @Override
+ public StateModel createNewStateModel(String stateUnitKey) {
+ EspressoStorageMockStateModel stateModel = new EspressoStorageMockStateModel();
+ stateModel.setDelay(_delay);
+ stateModel.setStateUnitKey(stateUnitKey);
+ return stateModel;
+ }
+
+ public static class EspressoStorageMockStateModel extends StateModel {
+ int _transDelay = 0;
+ String stateUnitKey;
+
+ public String getStateUnitKey() {
+ return stateUnitKey;
+ }
+
+ public void setStateUnitKey(String stateUnitKey) {
+ this.stateUnitKey = stateUnitKey;
+ }
+
+ public void setDelay(int delay) {
+ _transDelay = delay > 0 ? delay : 0;
+ }
+
+ public void onBecomeSlaveFromOffline(Message message,
+ NotificationContext context) {
+
+ System.out.println("EspressoStorageMockStateModel.onBecomeSlaveFromOffline() for "+ stateUnitKey);
+ sleep();
+ }
+
+ private void sleep() {
+ try {
+ Thread.sleep(_transDelay);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void onBecomeSlaveFromMaster(Message message,
+ NotificationContext context) {
+ System.out.println("EspressoStorageMockStateModel.onBecomeSlaveFromMaster() for "+ stateUnitKey);
+ sleep();
+
+ }
+
+ public void onBecomeMasterFromSlave(Message message,
+ NotificationContext context) {
+ System.out.println("EspressoStorageMockStateModel.onBecomeMasterFromSlave() for "+ stateUnitKey);
+ sleep();
+
+ }
+
+ public void onBecomeOfflineFromSlave(Message message,
+ NotificationContext context) {
+ System.out.println("EspressoStorageMockStateModel.onBecomeOfflineFromSlave() for "+ stateUnitKey);
+ sleep();
+
+ }
+
+ public void onBecomeDroppedFromOffline(Message message,
+ NotificationContext context) {
+ System.out.println("ObBecomeDroppedFromOffline() for "+ stateUnitKey);
+ sleep();
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/FnvHashFunction.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/FnvHashFunction.java b/mockservice/src/main/java/org/apache/helix/FnvHashFunction.java
new file mode 100644
index 0000000..4866a3f
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/FnvHashFunction.java
@@ -0,0 +1,201 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.nio.ByteBuffer;
+
+public class FnvHashFunction implements HashFunction
+{
+ private static final long FNV_BASIS = 0x811c9dc5;
+ private static final long FNV_PRIME = (1 << 24) + 0x193;
+
+ @Override
+ public long hash(ByteBuffer buf)
+ {
+ int length = buf.position() + buf.remaining();
+ return hash(buf, 0, length);
+ }
+
+ @Override
+ public long hash(ByteBuffer buf, int off, int len)
+ {
+ long hash = FNV_BASIS;
+
+ int last = Math.min(off + len, buf.position() + buf.remaining());
+
+ for (int i=off; i< last; i++)
+ {
+ hash ^= 0xFF & buf.get(i);
+ hash *= FNV_PRIME;
+ }
+ return hash;
+ }
+
+ public long hash(byte[] key)
+ {
+ long hash = FNV_BASIS;
+ for(int i = 0; i < key.length; i++) {
+ hash ^= 0xFF & key[i];
+ hash *= FNV_PRIME;
+ }
+
+ return hash;
+ }
+
+ @Override
+ public long hash(byte[] key, int numBuckets)
+ {
+ return hash(key)%numBuckets;
+ }
+
+
+ private long hash(long val)
+ {
+ long hashval = FNV_BASIS;
+
+ for(int i = 0; i < 8; i++)
+ {
+ long octet = val & 0x00ff;
+ val = val >> 8;
+
+ hashval = hashval ^ octet;
+ hashval = hashval * FNV_PRIME;
+ }
+ return (int)hashval;
+ }
+
+ @Override
+ public long hash(long val, int numBuckets)
+ {
+ return hash(val)%numBuckets;
+ }
+
+ /*
+ public static void main(String[] args)
+ {
+ byte[] b = new byte[1024*1024*100];
+ ByteBuffer buf = ByteBuffer.allocateDirect(1024*1024*100).order(DbusEvent.byteOrder);
+ Random r = new Random();
+ r.nextBytes(b);
+ buf.put(b);
+
+ FnvHashFunction fun = new FnvHashFunction();
+ CRC32 chksum = new CRC32();
+ JenkinsHashFunction jFun = new JenkinsHashFunction();
+
+ long start = 0;
+ long end = 0;
+ long hash = 0;
+ long diff = 0;
+ long delayMicro = 0;
+
+ chksum.reset();
+ chksum.update(b);
+ long prevhash = chksum.getValue();
+ for (int i = 0; i < 10; i++)
+ {
+ start = System.nanoTime();
+ chksum.reset();
+ chksum.update(b);
+ hash = chksum.getValue();
+ end = System.nanoTime();
+ assert(prevhash == hash);
+ diff += (end - start);
+ }
+
+ delayMicro = (diff/1000)/10;
+
+ System.out.println("Latency of System CRC32 (Micro Seconds) is: " + delayMicro);
+
+ prevhash = fun.hash(b);
+ for (int i = 0; i < 10; i++)
+ {
+ start = System.nanoTime();
+ hash = fun.hash(b);
+ end = System.nanoTime();
+ assert(prevhash == hash);
+ diff += (end - start);
+ }
+ delayMicro = (diff/1000)/10;
+ System.out.println("Latency of FNV (Micro Seconds) is: " + delayMicro);
+
+ prevhash = jFun.hash(b);
+ for (int i = 0; i < 10; i++)
+ {
+ start = System.nanoTime();
+ hash = jFun.hash(b);
+ end = System.nanoTime();
+ assert(prevhash == hash);
+ diff += (end - start);
+ }
+ delayMicro = (diff/1000)/10;
+ System.out.println("Latency of Jenkins (Micro Seconds) is: " + delayMicro);
+
+ prevhash = ByteBufferCRC32.getChecksum(b);
+ for (int i = 0; i < 10; i++)
+ {
+ start = System.nanoTime();
+ hash = ByteBufferCRC32.getChecksum(b);
+ end = System.nanoTime();
+ assert(prevhash == hash);
+ diff += (end - start);
+ }
+ delayMicro = (diff/1000)/10;
+ System.out.println("Latency of ByteBufferCRC32 (Micro Seconds) is: " + delayMicro);
+
+ //System.out.println("Buffer position-Remaining :" + buf.position() + "-" + buf.remaining());
+
+ prevhash = fun.hash(buf);
+ for (int i = 0; i < 10; i++)
+ {
+ start = System.nanoTime();
+ hash = fun.hash(buf);
+ end = System.nanoTime();
+ assert(prevhash == hash);
+ diff += (end - start);
+ }
+ delayMicro = (diff/1000)/10;
+ System.out.println("Latency of FNV (Micro Seconds) for ByteBuffer is: " + delayMicro);
+ //System.out.println("Buffer position-Remaining :" + buf.position() + "-" + buf.remaining());
+
+ prevhash = fun.hash(buf);
+ for (int i = 0; i < 10; i++)
+ {
+ start = System.nanoTime();
+ hash = fun.hash(buf);
+ end = System.nanoTime();
+ assert(prevhash == hash);
+ diff += (end - start);
+ }
+ delayMicro = (diff/1000)/10;
+ System.out.println("Latency of Jenkins (Micro Seconds) for ByteBuffer is: " + delayMicro);
+ //System.out.println("Buffer position-Remaining :" + buf.position() + "-" + buf.remaining());
+ prevhash = ByteBufferCRC32.getChecksum(buf);
+ for (int i = 0; i < 10; i++)
+ {
+ start = System.nanoTime();
+ hash = ByteBufferCRC32.getChecksum(buf);
+ end = System.nanoTime();
+ assert(prevhash == hash);
+ diff += (end - start);
+ }
+ delayMicro = (diff/1000)/10;
+ System.out.println("Latency of ByteBufferCRC32 (Micro Seconds) for ByteBuffer is: " + delayMicro);
+
+ //System.out.println("Buffer position-Remaining :" + buf.position() + "-" + buf.remaining());
+ }
+ */
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/HashFunction.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/HashFunction.java b/mockservice/src/main/java/org/apache/helix/HashFunction.java
new file mode 100644
index 0000000..1641212
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/HashFunction.java
@@ -0,0 +1,68 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Forked from com.linkedin.databus.core.util @ r293057
+ * @author sdas
+ *
+ */
+public interface HashFunction {
+
+
+ /*
+ * Generates Hash for entire byte buffer
+ * @param buf : ByteBuffer for which hash needs to be computed
+ * @return hash value of buffer
+ */
+ public long hash(ByteBuffer buf);
+
+ /*
+ * Generates Hash for a section of byte buffer denoted by its
+ * endpoints
+ *
+ * @param buf : ByteBuffer for which hash needs to be computed
+ * @param off : Starting Offset
+ * @param len : Length of the section for hash computation
+ * @return the hash value for the section of the buffer
+ */
+ public long hash(ByteBuffer buf, int off, int len);
+
+ /*
+ * Generates hash for the byte array and bucketize the value to
+ * 0.. (numBuckets - 1)
+ *
+ * @param key : Array to apply hash and bucketize
+ * @param numBuckets : Number of buckets for bucketization
+ *
+ * @return Returns the bucket in the range 0..(numBuckets - 1)
+ */
+ public long hash(byte[] key, int numBuckets);
+
+ /*
+ * Generates hash for the key and bucketize the value to
+ * 0.. (numBuckets - 1)
+ *
+ * @param key : Input key for which hash needs to be calculated
+ * @param numBuckets : Number of buckets for bucketization
+ *
+ * @return Returns the bucket in the range 0..(numBuckets - 1)
+ */
+ public long hash(long key, int numBuckets);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/MockEspressoService.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/MockEspressoService.java b/mockservice/src/main/java/org/apache/helix/MockEspressoService.java
new file mode 100644
index 0000000..d934a44
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/MockEspressoService.java
@@ -0,0 +1,269 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.net.InetAddress;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.log4j.Logger;
+import org.restlet.Application;
+import org.restlet.Component;
+import org.restlet.Context;
+import org.restlet.data.Request;
+import org.restlet.Restlet;
+import org.restlet.data.MediaType;
+import org.restlet.data.Protocol;
+import org.restlet.resource.StringRepresentation;
+import org.restlet.Router;
+
+import org.restlet.data.Response;
+
+
+public class MockEspressoService extends Application
+{
+ private static final Logger logger = Logger.getLogger(MockEspressoService.class);
+
+ public static final String HELP = "help";
+ public static final String CLUSTERNAME = "clusterName";
+ public static final String INSTANCENAME = "instanceName";
+ public static final String ZKSERVERADDRESS = "zkSvr";
+ public static final String PORT = "port";
+ public static final int DEFAULT_PORT = 8100;
+ protected static final String NODE_TYPE = "EspressoStorage";
+ //protected static final String INSTANCE_NAME = "localhost_1234";
+
+ public static final String DATABASENAME = "database";
+ public static final String TABLENAME = "table";
+ public static final String RESOURCENAME = "resource";
+ public static final String SUBRESOURCENAME = "subresource";
+ public static final String STOPSERVICECOMMAND = "stopservice";
+
+ public static final String CONTEXT_MOCK_NODE_NAME = "mocknode";
+ public static final String COMPONENT_NAME = "component";
+
+ Context _applicationContext;
+ static int _serverPort;
+ static String _zkAddr = "localhost:9999";
+ static String _instanceName = "localhost";
+ static String _clusterName = "";
+ public CMConnector _connector;
+ public EspressoStorageMockNode _mockNode;
+ static Context _context;
+ static Component _component;
+
+ public MockEspressoService(Context context)
+ {
+ super(_context);
+ _connector = null;
+
+
+ try {
+ _connector = new CMConnector(_clusterName, _instanceName, _zkAddr); //, zkClient);
+ }
+ catch (Exception e) {
+ logger.error("Unable to initialize CMConnector: "+e);
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ _mockNode = (EspressoStorageMockNode)MockNodeFactory.createMockNode(NODE_TYPE, _connector);
+ context.getAttributes().put(CONTEXT_MOCK_NODE_NAME, (Object)_mockNode);
+ }
+
+ @Override
+ public Restlet createRoot()
+ {
+ Router router = new Router(_context);
+
+ Restlet mainpage = new Restlet()
+ {
+ @Override
+ public void handle(Request request, Response response)
+ {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("<html>");
+ stringBuilder
+ .append("<head><title>Restlet Cluster Management page</title></head>");
+ stringBuilder.append("<body bgcolor=white>");
+ stringBuilder.append("<table border=\"0\">");
+ stringBuilder.append("<tr>");
+ stringBuilder.append("<td>");
+ stringBuilder.append("<h1>Rest cluster management interface V1</h1>");
+ stringBuilder.append("</td>");
+ stringBuilder.append("</tr>");
+ stringBuilder.append("</table>");
+ stringBuilder.append("</body>");
+ stringBuilder.append("</html>");
+ response.setEntity(new StringRepresentation(stringBuilder.toString(),
+ MediaType.TEXT_HTML));
+ }
+ };
+
+ if (_mockNode == null) {
+ logger.debug("_mockNode in createRoot is null");
+ }
+ router.attach("", mainpage);
+
+ //Espresso handlers
+ router.attach("/{"+DATABASENAME+"}/{"+TABLENAME+"}/{"+RESOURCENAME+"}", EspressoResource.class);
+ router.attach("/{"+DATABASENAME+"}/{"+TABLENAME+"}/{"+RESOURCENAME+"}/{"+SUBRESOURCENAME+"}", EspressoResource.class);
+
+ //Admin handlers
+ router.attach("/{"+STOPSERVICECOMMAND+"}", StopServiceResource.class);
+
+ return router;
+ }
+
+ public static void printUsage(Options cliOptions)
+ {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.printHelp("java " + MockEspressoService.class.getName(), cliOptions);
+ }
+
+ @SuppressWarnings("static-access")
+ private static Options constructCommandLineOptions()
+ {
+ Option helpOption = OptionBuilder.withLongOpt(HELP)
+ .withDescription("Prints command-line options info").create();
+ helpOption.setArgs(0);
+ helpOption.setRequired(false);
+ helpOption.setArgName("print help message");
+
+ Option zkServerOption = OptionBuilder.withLongOpt(ZKSERVERADDRESS)
+ .withDescription("Provide zookeeper address").create();
+ zkServerOption.setArgs(1);
+ zkServerOption.setRequired(true);
+ zkServerOption.setArgName("ZookeeperServerAddress(Required)");
+
+ Option clusterOption = OptionBuilder.withLongOpt(CLUSTERNAME)
+ .withDescription("Provide cluster name").create();
+ clusterOption.setArgs(1);
+ clusterOption.setRequired(true);
+ clusterOption.setArgName("Cluster name(Required)");
+
+ Option instanceOption = OptionBuilder.withLongOpt(INSTANCENAME)
+ .withDescription("Provide name for this instance").create();
+ instanceOption.setArgs(1);
+ instanceOption.setRequired(false);
+ instanceOption.setArgName("Instance name(Optional, defaults to localhost)");
+
+ Option portOption = OptionBuilder.withLongOpt(PORT)
+ .withDescription("Provide web service port").create();
+ portOption.setArgs(1);
+ portOption.setRequired(false);
+ portOption.setArgName("web service port, default: "+ DEFAULT_PORT);
+
+ Options options = new Options();
+ options.addOption(helpOption);
+ options.addOption(zkServerOption);
+ options.addOption(clusterOption);
+ options.addOption(instanceOption);
+ options.addOption(portOption);
+
+ return options;
+ }
+
+ public static void processCommandLineArgs(String[] cliArgs) throws Exception
+ {
+ CommandLineParser cliParser = new GnuParser();
+ Options cliOptions = constructCommandLineOptions();
+ CommandLine cmd = null;
+
+ try
+ {
+ cmd = cliParser.parse(cliOptions, cliArgs);
+ }
+ catch (ParseException pe)
+ {
+ System.err.println("MockEspressoService: failed to parse command-line options: "
+ + pe.toString());
+ printUsage(cliOptions);
+ System.exit(1);
+ }
+ _serverPort = DEFAULT_PORT;
+ if(cmd.hasOption(HELP))
+ {
+ printUsage(cliOptions);
+ return;
+ }
+ else if(cmd.hasOption(PORT))
+ {
+ _serverPort = Integer.parseInt(cmd.getOptionValue(PORT));
+ }
+ if (cmd.hasOption(ZKSERVERADDRESS)) {
+ _zkAddr = cmd.getOptionValue(ZKSERVERADDRESS);
+ }
+ if (cmd.hasOption(CLUSTERNAME)) {
+ _clusterName = cmd.getOptionValue(CLUSTERNAME);
+ logger.debug("_clusterName: "+_clusterName);
+ }
+ if (cmd.hasOption(INSTANCENAME)) {
+ _instanceName = cmd.getOptionValue(INSTANCENAME);
+ _instanceName = _instanceName.replace(':', '_');
+ logger.debug("_instanceName: "+_instanceName);
+ }
+ }
+
+ public void run() throws Exception {
+
+ logger.debug("Start of mock service run");
+
+
+ if (_mockNode == null) {
+ logger.debug("_mockNode null");
+ }
+ else {
+ logger.debug("_mockNode not null");
+ }
+ if (_mockNode != null) {
+ // start web server with the zkServer address
+ _component = new Component();
+ _component.getServers().add(Protocol.HTTP, _serverPort);
+ // Attach the application to the component and start it
+ _component.getDefaultHost().attach(this); //(application);
+ _context.getAttributes().put(COMPONENT_NAME, (Object)_component);
+ // _context.getParameters().set("maxTotalConnections", "16",true);
+ _component.start();
+ //start mock espresso node
+ //!!!_mockNode.run();
+ }
+ else {
+ logger.error("Unknown MockNode type "+NODE_TYPE);
+ System.exit(-1);
+ }
+ logger.debug("mock service done");
+ }
+
+ /**
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception
+ {
+ processCommandLineArgs(args);
+ _context = new Context();
+ MockEspressoService service = new MockEspressoService(_context);
+ service.run();
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/MockNode.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/MockNode.java b/mockservice/src/main/java/org/apache/helix/MockNode.java
new file mode 100644
index 0000000..52c35d0
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/MockNode.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.log4j.Logger;
+
+public abstract class MockNode {
+ CMConnector _cmConnector;
+
+ public MockNode(CMConnector cm) {
+ _cmConnector = cm;
+ }
+
+ public abstract void run();
+
+ public void disconnect() {
+ _cmConnector.disconnect();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/MockNodeFactory.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/MockNodeFactory.java b/mockservice/src/main/java/org/apache/helix/MockNodeFactory.java
new file mode 100644
index 0000000..d428dae
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/MockNodeFactory.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.log4j.Logger;
+
+public class MockNodeFactory {
+
+ private static final Logger logger = Logger.getLogger(MockNodeFactory.class);
+
+ public MockNodeFactory()
+ {
+ }
+
+ public static MockNode createMockNode(String type, CMConnector cm) {
+ if (type.equals("EspressoStorage")) {
+ return new EspressoStorageMockNode(cm);
+ }
+ else {
+ logger.error("Unknown MockNode type "+type);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/MockRunner.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/MockRunner.java b/mockservice/src/main/java/org/apache/helix/MockRunner.java
new file mode 100644
index 0000000..cf9615e
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/MockRunner.java
@@ -0,0 +1,58 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Hello world!
+ *
+ */
+public class MockRunner
+{
+
+ private static final Logger logger = Logger.getLogger(MockRunner.class);
+
+ protected static final String nodeType = "EspressoStorage";
+ protected static final String ZK_ADDR = "localhost:2184";
+ protected static final String INSTANCE_NAME = "localhost_1234";
+ protected static final String CLUSTER_NAME = "MockCluster";
+
+ public static void main( String[] args )
+ {
+ //ZkClient zkClient = new ZkClient(ZK_ADDR, 3000, 10000, new ZNRecordSerializer());
+ CMConnector cm = null;
+ try {
+ cm = new CMConnector(CLUSTER_NAME, INSTANCE_NAME, ZK_ADDR); //, zkClient);
+ }
+ catch (Exception e) {
+ logger.error("Unable to initialize CMConnector: "+e);
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ MockNode mock = MockNodeFactory.createMockNode(nodeType, cm);
+ if (mock != null) {
+ mock.run();
+ }
+ else {
+ logger.error("Unknown MockNode type "+nodeType);
+ System.exit(-1);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/main/java/org/apache/helix/StopServiceResource.java
----------------------------------------------------------------------
diff --git a/mockservice/src/main/java/org/apache/helix/StopServiceResource.java b/mockservice/src/main/java/org/apache/helix/StopServiceResource.java
new file mode 100644
index 0000000..2f8d202
--- /dev/null
+++ b/mockservice/src/main/java/org/apache/helix/StopServiceResource.java
@@ -0,0 +1,141 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.util.List;
+
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.restlet.Component;
+import org.restlet.Context;
+import org.restlet.data.MediaType;
+import org.restlet.data.Method;
+import org.restlet.data.Request;
+import org.restlet.data.Response;
+import org.restlet.data.Status;
+import org.restlet.resource.Representation;
+import org.restlet.resource.Resource;
+import org.restlet.resource.ResourceException;
+import org.restlet.resource.StringRepresentation;
+import org.restlet.resource.Variant;
+
+
+public class StopServiceResource extends Resource {
+
+
+ private static final Logger logger = Logger
+ .getLogger(StopServiceResource.class);
+
+ Context _context;
+
+ public StopServiceResource(Context context,
+ Request request,
+ Response response)
+ {
+ super(context, request, response);
+ getVariants().add(new Variant(MediaType.TEXT_PLAIN));
+ getVariants().add(new Variant(MediaType.APPLICATION_JSON));
+ _context = context;
+ }
+
+ public boolean allowGet()
+ {
+ System.out.println("PutResource.allowGet()");
+ return true;
+ }
+
+ public boolean allowPost()
+ {
+ System.out.println("PutResource.allowPost()");
+ return false;
+ }
+
+ public boolean allowPut()
+ {
+ System.out.println("PutResource.allowPut()");
+ return false;
+ }
+
+ public boolean allowDelete()
+ {
+ return false;
+ }
+
+
+ class StopThread implements Runnable {
+
+ Component _component;
+ MockNode _mockNode;
+
+ StopThread(Component c, MockNode m) {
+ _component = c;
+ _mockNode = m;
+ }
+
+ @Override
+ public void run() {
+ try {
+ //sleep for 1 second, then end service
+ Thread.sleep(1000);
+ _component.stop();
+ _mockNode.disconnect();
+ System.exit(0);
+ } catch (Exception e) {
+ logger.error("Unable to stop service: "+e);
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ //XXX: handling both gets and puts here for now
+ public Representation represent(Variant variant)
+ {
+ System.out.println("StopServiceResource.represent()");
+ StringRepresentation presentation = null;
+ try
+ {
+ logger.debug("in represent, stopping service");
+ Component component = (Component)_context.getAttributes().get(MockEspressoService.COMPONENT_NAME);
+ EspressoStorageMockNode mock = (EspressoStorageMockNode)_context.getAttributes().get(MockEspressoService.CONTEXT_MOCK_NODE_NAME);
+ presentation = new StringRepresentation("Stopping in 1 second", MediaType.APPLICATION_JSON);
+ Thread stopper = new Thread(new StopThread(component, mock));
+ stopper.start();
+ }
+
+ catch(Exception e)
+ {
+ String error = "Error shutting down";
+ presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON);
+ e.printStackTrace();
+ }
+ return presentation;
+ }
+
+ public void storeRepresentation(Representation entity) throws ResourceException {
+
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/mockservice/src/test/java/org/apache/helix/AppTest.java
----------------------------------------------------------------------
diff --git a/mockservice/src/test/java/org/apache/helix/AppTest.java b/mockservice/src/test/java/org/apache/helix/AppTest.java
new file mode 100644
index 0000000..8efe7b4
--- /dev/null
+++ b/mockservice/src/test/java/org/apache/helix/AppTest.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.testng.annotations.Test;
+import org.testng.AssertJUnit;
+
+/**
+ * Unit test for simple App.
+ */
+public class AppTest
+{
+ /**
+ * Create the test case
+ *
+ * @param testName name of the test case
+ */
+ public AppTest( String testName )
+ {
+ }
+
+ /**
+ * Rigourous Test :-)
+ */
+ @Test
+ public void testApp()
+ {
+ AssertJUnit.assertTrue( true );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
new file mode 100644
index 0000000..73e7a75
--- /dev/null
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
@@ -0,0 +1,130 @@
+package org.apache.helix.recipes.rabbitmq;
+
+import java.util.List;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+
+public class Consumer
+{
+ private final String _zkAddr;
+ private final String _clusterName;
+ private final String _consumerId;
+ private final String _mqServer;
+ private HelixManager _manager = null;
+
+ public Consumer(String zkAddr, String clusterName, String consumerId, String mqServer)
+ {
+ _zkAddr = zkAddr;
+ _clusterName = clusterName;
+ _consumerId = consumerId;
+ _mqServer = mqServer;
+ }
+
+ public void connect()
+ {
+ try
+ {
+ _manager =
+ HelixManagerFactory.getZKHelixManager(_clusterName,
+ _consumerId,
+ InstanceType.PARTICIPANT,
+ _zkAddr);
+
+ StateMachineEngine stateMach = _manager.getStateMachineEngine();
+ ConsumerStateModelFactory modelFactory =
+ new ConsumerStateModelFactory(_consumerId, _mqServer);
+ stateMach.registerStateModelFactory(SetupConsumerCluster.DEFAULT_STATE_MODEL, modelFactory);
+
+ _manager.connect();
+
+ Thread.currentThread().join();
+ }
+ catch (InterruptedException e)
+ {
+ System.err.println(" [-] " + _consumerId + " is interrupted ...");
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ finally
+ {
+ disconnect();
+ }
+ }
+
+ public void disconnect()
+ {
+ if (_manager != null)
+ {
+ _manager.disconnect();
+ }
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ if (args.length < 3)
+ {
+ System.err.println("USAGE: java Consumer zookeeperAddress (e.g. localhost:2181) consumerId (0-2), rabbitmqServer (e.g. localhost)");
+ System.exit(1);
+ }
+
+ final String zkAddr = args[0];
+ final String clusterName = SetupConsumerCluster.DEFAULT_CLUSTER_NAME;
+ final String consumerId = args[1];
+ final String mqServer = args[2];
+
+ ZkClient zkclient = null;
+ try
+ {
+ // add node to cluster if not already added
+ zkclient =
+ new ZkClient(zkAddr,
+ ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT,
+ new ZNRecordSerializer());
+ ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
+
+ List<String> nodes = admin.getInstancesInCluster(clusterName);
+ if (!nodes.contains("consumer_" + consumerId))
+ {
+ InstanceConfig config = new InstanceConfig("consumer_" + consumerId);
+ config.setHostName("localhost");
+ config.setInstanceEnabled(true);
+ admin.addInstance(clusterName, config);
+ }
+
+ // start consumer
+ final Consumer consumer =
+ new Consumer(zkAddr, clusterName, "consumer_" + consumerId, mqServer);
+
+ Runtime.getRuntime().addShutdownHook(new Thread()
+ {
+ @Override
+ public void run()
+ {
+ System.out.println("Shutting down consumer_" + consumerId);
+ consumer.disconnect();
+ }
+ });
+
+ consumer.connect();
+ }
+ finally
+ {
+ if (zkclient != null)
+ {
+ zkclient.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModel.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModel.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModel.java
new file mode 100644
index 0000000..27d935e
--- /dev/null
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModel.java
@@ -0,0 +1,97 @@
+package org.apache.helix.recipes.rabbitmq;
+
+import org.apache.log4j.Logger;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+
+@StateModelInfo(initialState = "OFFLINE", states = { "ONLINE", "ERROR" })
+public class ConsumerStateModel extends StateModel
+{
+ private static Logger LOG = Logger.getLogger(ConsumerStateModel.class);
+
+ private final String _consumerId;
+ private final String _partition;
+
+ private final String _mqServer;
+ private ConsumerThread _thread = null;
+
+ public ConsumerStateModel(String consumerId, String partition, String mqServer)
+ {
+ _partition = partition;
+ _consumerId = consumerId;
+ _mqServer = mqServer;
+ }
+
+ @Transition(to = "ONLINE", from = "OFFLINE")
+ public void onBecomeOnlineFromOffline(Message message, NotificationContext context)
+ {
+ LOG.debug(_consumerId + " becomes ONLINE from OFFLINE for " + _partition);
+
+ if (_thread == null)
+ {
+ LOG.debug("Starting ConsumerThread for " + _partition + "...");
+ _thread = new ConsumerThread(_partition, _mqServer, _consumerId);
+ _thread.start();
+ LOG.debug("Starting ConsumerThread for " + _partition + " done");
+
+ }
+ }
+
+ @Transition(to = "OFFLINE", from = "ONLINE")
+ public void onBecomeOfflineFromOnline(Message message, NotificationContext context)
+ throws InterruptedException
+ {
+ LOG.debug(_consumerId + " becomes OFFLINE from ONLINE for " + _partition);
+
+ if (_thread != null)
+ {
+ LOG.debug("Stopping " + _consumerId + " for " + _partition + "...");
+
+ _thread.interrupt();
+ _thread.join(2000);
+ _thread = null;
+ LOG.debug("Stopping " + _consumerId + " for " + _partition + " done");
+
+ }
+ }
+
+ @Transition(to = "DROPPED", from = "OFFLINE")
+ public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
+ {
+ LOG.debug(_consumerId + " becomes DROPPED from OFFLINE for " + _partition);
+ }
+
+ @Transition(to = "OFFLINE", from = "ERROR")
+ public void onBecomeOfflineFromError(Message message, NotificationContext context)
+ {
+ LOG.debug(_consumerId + " becomes OFFLINE from ERROR for " + _partition);
+ }
+
+ @Override
+ public void reset()
+ {
+ LOG.warn("Default reset() invoked");
+
+ if (_thread != null)
+ {
+ LOG.debug("Stopping " + _consumerId + " for " + _partition + "...");
+
+ _thread.interrupt();
+ try
+ {
+ _thread.join(2000);
+ } catch (InterruptedException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ _thread = null;
+ LOG.debug("Stopping " + _consumerId + " for " + _partition + " done");
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java
new file mode 100644
index 0000000..673c7fd
--- /dev/null
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java
@@ -0,0 +1,21 @@
+package org.apache.helix.recipes.rabbitmq;
+
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+public class ConsumerStateModelFactory extends StateModelFactory<ConsumerStateModel>
+{
+ private final String _consumerId;
+ private final String _mqServer;
+ public ConsumerStateModelFactory(String consumerId, String msServer)
+ {
+ _consumerId = consumerId;
+ _mqServer = msServer;
+ }
+
+ @Override
+ public ConsumerStateModel createNewStateModel(String partition)
+ {
+ ConsumerStateModel model = new ConsumerStateModel(_consumerId, partition, _mqServer);
+ return model;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerThread.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerThread.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerThread.java
new file mode 100644
index 0000000..0a6ed5c
--- /dev/null
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerThread.java
@@ -0,0 +1,76 @@
+package org.apache.helix.recipes.rabbitmq;
+
+import java.io.IOException;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.QueueingConsumer;
+
+public class ConsumerThread extends Thread
+{
+ private static final String EXCHANGE_NAME = "topic_logs";
+ private final String _partition;
+ private final String _mqServer;
+ private final String _consumerId;
+
+ public ConsumerThread(String partition, String mqServer, String consumerId)
+ {
+ _partition = partition;
+ _mqServer = mqServer;
+ _consumerId = consumerId;
+ }
+
+ @Override
+ public void run()
+ {
+ Connection connection = null;
+ try
+ {
+ ConnectionFactory factory = new ConnectionFactory();
+ factory.setHost(_mqServer);
+ connection = factory.newConnection();
+ Channel channel = connection.createChannel();
+
+ channel.exchangeDeclare(EXCHANGE_NAME, "topic");
+ String queueName = channel.queueDeclare().getQueue();
+
+ String bindingKey = _partition;
+ channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
+
+ System.out.println(" [*] " + _consumerId + " Waiting for messages on " + bindingKey + ". To exit press CTRL+C");
+
+ QueueingConsumer consumer = new QueueingConsumer(channel);
+ channel.basicConsume(queueName, true, consumer);
+
+ while (true)
+ {
+ QueueingConsumer.Delivery delivery = consumer.nextDelivery();
+ String message = new String(delivery.getBody());
+ String routingKey = delivery.getEnvelope().getRoutingKey();
+
+ System.out.println(" [x] " + _consumerId + " Received '" + routingKey + "':'" + message + "'");
+ }
+ } catch (InterruptedException e)
+ {
+ System.err.println(" [-] " + _consumerId + " on " + _partition + " is interrupted ...");
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ } finally
+ {
+ if (connection != null)
+ {
+ try
+ {
+ connection.close();
+ } catch (IOException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Emitter.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Emitter.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Emitter.java
new file mode 100644
index 0000000..a91523a
--- /dev/null
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Emitter.java
@@ -0,0 +1,56 @@
+package org.apache.helix.recipes.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+public class Emitter
+{
+
+ private static final String EXCHANGE_NAME = "topic_logs";
+
+ public static void main(String[] args) throws Exception
+ {
+ if (args.length < 1)
+ {
+ System.err.println("USAGE: java Emitter rabbitmqServer (e.g. localhost) numberOfMessage (optional)");
+ System.exit(1);
+ }
+
+ final String mqServer = args[0]; // "zzhang-ld";
+ int count = Integer.MAX_VALUE;
+ if (args.length > 1)
+ {
+ try
+ {
+ count = Integer.parseInt(args[1]);
+ } catch (Exception e) {
+ // TODO: handle exception
+ }
+ }
+ System.out.println("Sending " + count + " messages with random topic id");
+
+
+ ConnectionFactory factory = new ConnectionFactory();
+ factory.setHost(mqServer);
+ Connection connection = factory.newConnection();
+ Channel channel = connection.createChannel();
+
+ channel.exchangeDeclare(EXCHANGE_NAME, "topic");
+
+ for (int i = 0; i < count; i++)
+ {
+ int rand = ((int) (Math.random() * 10000) % SetupConsumerCluster.DEFAULT_PARTITION_NUMBER);
+ String routingKey = "topic_" + rand;
+ String message = "message_" + rand;
+
+ channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
+ System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
+
+ Thread.sleep(1000);
+ }
+
+ connection.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/SetupConsumerCluster.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/SetupConsumerCluster.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/SetupConsumerCluster.java
new file mode 100644
index 0000000..fa01af4
--- /dev/null
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/SetupConsumerCluster.java
@@ -0,0 +1,58 @@
+package org.apache.helix.recipes.rabbitmq;
+
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
+
+public class SetupConsumerCluster
+{
+ public static final String DEFAULT_CLUSTER_NAME = "rabbitmq-consumer-cluster";
+ public static final String DEFAULT_RESOURCE_NAME = "topic";
+ public static final int DEFAULT_PARTITION_NUMBER = 6;
+ public static final String DEFAULT_STATE_MODEL = "OnlineOffline";
+
+ public static void main(String[] args)
+ {
+ if (args.length < 1)
+ {
+ System.err.println("USAGE: java SetupConsumerCluster zookeeperAddress (e.g. localhost:2181)");
+ System.exit(1);
+ }
+
+ final String zkAddr = args[0];
+ final String clusterName = DEFAULT_CLUSTER_NAME;
+
+ ZkClient zkclient = null;
+ try
+ {
+ zkclient = new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
+
+ // add cluster
+ admin.addCluster(clusterName, true);
+
+ // add state model definition
+ StateModelConfigGenerator generator = new StateModelConfigGenerator();
+ admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL,
+ new StateModelDefinition(generator.generateConfigForOnlineOffline()));
+
+ // add resource "topic" which has 6 partitions
+ String resourceName = DEFAULT_RESOURCE_NAME;
+ admin.addResource(clusterName, resourceName, DEFAULT_PARTITION_NUMBER, DEFAULT_STATE_MODEL,
+ IdealStateModeProperty.AUTO_REBALANCE.toString());
+
+ admin.rebalance(clusterName, resourceName, 1);
+
+ } finally
+ {
+ if (zkclient != null)
+ {
+ zkclient.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/StartClusterManager.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/StartClusterManager.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/StartClusterManager.java
new file mode 100644
index 0000000..5f43b53
--- /dev/null
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/StartClusterManager.java
@@ -0,0 +1,42 @@
+package org.apache.helix.recipes.rabbitmq;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.HelixControllerMain;
+
+public class StartClusterManager
+{
+ public static void main(String[] args)
+ {
+ if (args.length < 1)
+ {
+ System.err.println("USAGE: java StartClusterManager zookeeperAddress (e.g. localhost:2181)");
+ System.exit(1);
+ }
+
+ final String clusterName = SetupConsumerCluster.DEFAULT_CLUSTER_NAME;
+ final String zkAddr = args[0];
+
+ try
+ {
+ final HelixManager manager = HelixControllerMain.startHelixController(zkAddr, clusterName, null,
+ HelixControllerMain.STANDALONE);
+
+ Runtime.getRuntime().addShutdownHook(new Thread()
+ {
+ @Override
+ public void run()
+ {
+ System.out.println("Shutting down cluster manager: " + manager.getInstanceName());
+ manager.disconnect();
+ }
+ });
+
+ Thread.currentThread().join();
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+}