You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/06/28 05:57:20 UTC

[07/25] incubator-atlas git commit: ATLAS-1898: initial commit of ODF

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueManager.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueManager.java b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueManager.java
new file mode 100755
index 0000000..e759ecc
--- /dev/null
+++ b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueManager.java
@@ -0,0 +1,488 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.messaging.kafka;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.atlas.odf.api.OpenDiscoveryFramework;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.engine.KafkaGroupOffsetInfo;
+import org.apache.atlas.odf.api.engine.KafkaStatus;
+import org.apache.atlas.odf.api.engine.KafkaTopicStatus;
+import org.apache.atlas.odf.api.engine.ThreadStatus;
+import org.apache.atlas.odf.api.settings.KafkaMessagingConfiguration;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager;
+import org.apache.atlas.odf.core.notification.NotificationListener;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.engine.MessagingStatus;
+import org.apache.atlas.odf.api.engine.PartitionOffsetInfo;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.controlcenter.AdminMessage;
+import org.apache.atlas.odf.core.controlcenter.AdminQueueProcessor;
+import org.apache.atlas.odf.core.controlcenter.ConfigChangeQueueProcessor;
+import org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore.StatusQueueProcessor;
+import org.apache.atlas.odf.core.controlcenter.DiscoveryServiceStarter;
+import org.apache.atlas.odf.core.controlcenter.ExecutorServiceFactory;
+import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntime;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes;
+import org.apache.atlas.odf.core.controlcenter.StatusQueueEntry;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager.ThreadStartupResult;
+import org.apache.atlas.odf.core.controlcenter.TrackerUtil;
+import org.apache.atlas.odf.core.notification.NotificationManager;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.common.TopicExistsException;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+public class KafkaQueueManager implements DiscoveryServiceQueueManager {
+
+	public static final String TOPIC_NAME_STATUS_QUEUE = "odf-status-topic";
+	public static final String TOPIC_NAME_ADMIN_QUEUE = "odf-admin-topic";
+	public static final String ADMIN_QUEUE_KEY = "odf-admin-queue-key";
+	public static final String SERVICE_TOPIC_PREFIX = "odf-topic-";
+
+	public static final RackAwareMode DEFAULT_RACK_AWARE_MODE = RackAwareMode.Disabled$.MODULE$;
+	
+	//use static UUID so that no unnecessary consumer threads are started
+	private final static String UNIQUE_SESSION_THREAD_ID = UUID.randomUUID().toString();
+
+	private final static int THREAD_STARTUP_TIMEOUT_MS = 5000;
+	
+	private static List<String> queueConsumerNames = null;
+	private static Object startLock = new Object();
+
+	private final static Logger logger = Logger.getLogger(KafkaQueueManager.class.getName());
+
+	private ThreadManager threadManager;
+	private SettingsManager odfConfig;
+	private String zookeeperConnectString;
+
+	public KafkaQueueManager() {
+		ODFInternalFactory factory = new ODFInternalFactory();
+		threadManager = factory.create(ThreadManager.class);
+		ExecutorServiceFactory esf = factory.create(ExecutorServiceFactory.class);
+		threadManager.setExecutorService(esf.createExecutorService());
+		zookeeperConnectString = factory.create(Environment.class).getZookeeperConnectString();
+		odfConfig = factory.create(SettingsManager.class);
+	}
+	
+	
+	public Properties getConsumerConfigProperties(String consumerGroupID, boolean consumeFromEnd) {
+		Properties kafkaConsumerProps = odfConfig.getKafkaConsumerProperties();
+		kafkaConsumerProps.put("group.id", consumerGroupID);
+		if (zookeeperConnectString != null) {
+			kafkaConsumerProps.put("zookeeper.connect", zookeeperConnectString);
+		}
+		if (consumeFromEnd) {
+			kafkaConsumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+		} else {
+			kafkaConsumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+		}
+		kafkaConsumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+		kafkaConsumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+		kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
+		kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);		
+		return kafkaConsumerProps;
+	}
+
+	private String getBootstrapServers() {
+		final List<String> brokers = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperConnectString);
+		StringBuilder servers = new StringBuilder();
+		final Iterator<String> iterator = brokers.iterator();
+		while(iterator.hasNext()){
+			servers.append(iterator.next());
+			if(iterator.hasNext()){
+				servers.append(",");
+			}
+		}
+		return servers.toString();
+	}
+
+	protected void createTopicIfNotExists(String topicName, int partitionCount, Properties props) {
+		String zkHosts = props.getProperty("zookeeper.connect");
+		ZkClient zkClient = null;
+		try {
+			zkClient = new ZkClient(zkHosts, Integer.valueOf(props.getProperty("zookeeperSessionTimeoutMs")),
+					Integer.valueOf(props.getProperty("zookeeperConnectionTimeoutMs")), ZKStringSerializer$.MODULE$);
+		} catch (ZkTimeoutException zkte) {
+			logger.log(Level.SEVERE, "Could not connect to the Zookeeper instance at ''{0}''. Please ensure that Zookeeper is running", zkHosts);
+		}
+		try {
+			logger.log(Level.FINEST, "Checking if topic ''{0}'' already exists", topicName);
+			// using partition size 1 and replication size 1, no special
+			// per-topic config needed
+			try {
+				final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zkHosts), false);
+				if (!AdminUtils.topicExists(zkUtils, topicName)) {
+					logger.log(Level.INFO, "Topic ''{0}'' does not exist, creating it", topicName);
+
+					//FIXME zkUtils isSecure parameter? Only with SSL! --> parse zkhosts?
+					KafkaMessagingConfiguration kafkaConfig = ((KafkaMessagingConfiguration) odfConfig.getODFSettings().getMessagingConfiguration());
+					AdminUtils.createTopic(zkUtils, topicName, partitionCount, kafkaConfig.getKafkaBrokerTopicReplication(),
+							new Properties(), DEFAULT_RACK_AWARE_MODE);
+					logger.log(Level.FINE, "Topic ''{0}'' created", topicName);
+					//wait before continuing to make sure the topic exists BEFORE consumers are started
+					try {
+						Thread.sleep(1500);
+					} catch (InterruptedException e) {
+						// TODO Auto-generated catch block
+						e.printStackTrace();
+					}
+				}
+			} catch (TopicExistsException ex) {
+				logger.log(Level.FINE, "Topic ''{0}'' already exists.", topicName);
+			}
+		} finally {
+			if (zkClient != null) {
+				zkClient.close();
+			}
+		}
+	}
+
+
+	private String getTopicName(ServiceRuntime runtime) {
+		return "odf-runtime-" + runtime.getName();
+	}
+	
+	private String getConsumerGroup(ServiceRuntime runtime) {
+		return getTopicName(runtime) + "_group";
+	}
+	
+	private List<ThreadStartupResult> scheduleAllRuntimeConsumers() {
+		List<ThreadStartupResult> results = new ArrayList<>();
+		for (ServiceRuntime runtime : ServiceRuntimes.getActiveRuntimes()) {
+			results.addAll(scheduleRuntimeConsumers(runtime));
+		}
+		return results;
+	}
+	
+	private List<ThreadStartupResult> scheduleRuntimeConsumers(ServiceRuntime runtime) {
+		logger.log(Level.FINER, "Create consumers on queue for runtime ''{0}'' if it doesn't already exist", runtime.getName());
+
+		String topicName = getTopicName(runtime);
+		String consumerGroupId = getConsumerGroup(runtime);
+		Properties kafkaConsumerProps = getConsumerConfigProperties(consumerGroupId, false); // read entries from beginning if consumer was never initialized 
+		String threadName = "RuntimeQueueConsumer" + topicName;
+		List<ThreadStartupResult> result = new ArrayList<ThreadStartupResult>();
+		if (threadManager.getStateOfUnmanagedThread(threadName) != ThreadStatus.ThreadState.RUNNING) {
+			createTopicIfNotExists(topicName, 1, kafkaConsumerProps);
+			ThreadStartupResult startupResult = threadManager.startUnmanagedThread(threadName, new KafkaRuntimeConsumer(runtime, topicName, kafkaConsumerProps, new DiscoveryServiceStarter()));
+			result.add(startupResult);		
+		} else {
+			result.add(new ThreadStartupResult(threadName) {
+				@Override
+				public boolean isNewThreadCreated() {
+					return false;
+				}
+
+				@Override
+				public boolean isReady() {
+					return true;
+				}
+			});
+		}
+		return result;
+	}
+
+	
+	private List<ThreadStartupResult> scheduleConsumerThreads(String topicName, int partitionCount, Properties kafkaConsumerProps, String threadName,
+			List<QueueMessageProcessor> processors) {
+		if (processors.size() != partitionCount) {
+			final String msg = "The number of processors must be equal to the partition count in order to support parallel processing";
+			logger.warning(msg);
+			throw new RuntimeException(msg);
+		}
+		createTopicIfNotExists(topicName, partitionCount, kafkaConsumerProps);
+
+		List<ThreadStartupResult> result = new ArrayList<ThreadStartupResult>();
+		for (int no = 0; no < partitionCount; no++) {
+			if (threadManager.getStateOfUnmanagedThread(threadName + "_" + no) != ThreadStatus.ThreadState.RUNNING) {
+				QueueMessageProcessor processor = processors.get(no);
+				ThreadStartupResult created = threadManager.startUnmanagedThread(threadName + "_" + no, new KafkaQueueConsumer(topicName, kafkaConsumerProps, processor));
+				if (created.isNewThreadCreated()) {
+					logger.log(Level.INFO, "Created new consumer thread on topic ''{0}'' with group ID ''{1}'', thread name: ''{2}'', properties: ''{3}''",
+							new Object[] { topicName, kafkaConsumerProps.getProperty("group.id"), threadName + "_" + no, kafkaConsumerProps.toString() });
+				} else {
+					logger.log(Level.FINE, "Consumer thread with thread name: ''{0}'' already exists, doing nothing", new Object[] { threadName + "_" + no });
+				}
+				result.add(created);
+			} else {
+				result.add(new ThreadStartupResult(threadName) {
+					@Override
+					public boolean isNewThreadCreated() {
+						return false;
+					}
+
+					@Override
+					public boolean isReady() {
+						return true;
+					}
+				});
+			}
+		}
+		return result;
+	}
+
+	private ThreadStartupResult scheduleConsumerThread(String topicName, Properties kafkaConsumerProps, String threadName, QueueMessageProcessor processor) {
+		return scheduleConsumerThreads(topicName, 1, kafkaConsumerProps, threadName, Arrays.asList(processor)).get(0);
+	}
+
+	@Override
+	public void enqueue(AnalysisRequestTracker tracker) {
+		DiscoveryServiceRequest dsRequest = TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker);
+		if (dsRequest == null) {
+			throw new RuntimeException("Tracker is finished, should not be enqueued");
+		}
+		String dsID = dsRequest.getDiscoveryServiceId();
+		dsRequest.setPutOnRequestQueue(System.currentTimeMillis());
+		ServiceRuntime runtime = ServiceRuntimes.getRuntimeForDiscoveryService(dsID);
+		if (runtime == null) {
+			throw new RuntimeException(MessageFormat.format("Service runtime for service ''{0}'' was not found.", dsID));
+		}
+		enqueueJSONMessage(getTopicName(runtime), tracker, tracker.getRequest().getId());
+	}
+
+	private void enqueueJSONMessage(String topicName, Object jsonObject, String key) {
+		String value = null;
+		try {
+			value = JSONUtils.toJSON(jsonObject);
+		} catch (JSONException e) {
+			throw new RuntimeException(e);
+		}
+		new ODFInternalFactory().create(KafkaProducerManager.class).sendMsg(topicName, key, value);
+	}
+
+	List<ThreadStartupResult> scheduleStatusQueueConsumers() {
+		logger.log(Level.FINER, "Create consumers on status queue if they don't already exist");
+		List<ThreadStartupResult> results = new ArrayList<ThreadStartupResult>();
+
+		// create consumer thread for the status watcher of all trackes
+		String statusWatcherConsumerGroupID = "DSStatusWatcherConsumerGroup" + UNIQUE_SESSION_THREAD_ID; // have a new group id on each node that reads all from the beginning
+		// always read from beginning for the status queue
+		Properties statusWatcherKafkaConsumerProps = getConsumerConfigProperties(statusWatcherConsumerGroupID, false);
+		final String statusWatcherThreadName = "StatusWatcher" + TOPIC_NAME_STATUS_QUEUE; // a fixed name
+		String threadNameWithPartition = statusWatcherThreadName + "_0";
+		final ThreadStatus.ThreadState stateOfUnmanagedThread = threadManager.getStateOfUnmanagedThread(threadNameWithPartition);
+		logger.fine("State of status watcher thread: " + stateOfUnmanagedThread);
+		if (stateOfUnmanagedThread != ThreadStatus.ThreadState.RUNNING) {
+			final ThreadStartupResult scheduleConsumerThread = scheduleConsumerThread(TOPIC_NAME_STATUS_QUEUE, statusWatcherKafkaConsumerProps, statusWatcherThreadName,
+					new StatusQueueProcessor());
+			results.add(scheduleConsumerThread);
+		} else {
+			results.add(new ThreadStartupResult(statusWatcherThreadName) {
+				@Override
+				public boolean isNewThreadCreated() {
+					return false;
+				}
+
+				@Override
+				public boolean isReady() {
+					return true;
+				}
+			});
+		}
+
+		return results;
+	}
+
+
+	@Override
+	public void enqueueInStatusQueue(StatusQueueEntry sqe) {
+		enqueueJSONMessage(TOPIC_NAME_STATUS_QUEUE, sqe, StatusQueueEntry.getRequestId(sqe));
+	}
+
+
+	private List<ThreadStartupResult> scheduleAdminQueueConsumers() {
+		List<ThreadStartupResult> results = new ArrayList<ThreadStartupResult>();
+		//schedule admin queue consumers
+		// consumer group so that every node receives events
+		String adminWatcherConsumerGroupID = "DSAdminQueueConsumerGroup" + UNIQUE_SESSION_THREAD_ID; // have a new group id on each node 
+		Properties adminWatcherKafkaConsumerProps = getConsumerConfigProperties(adminWatcherConsumerGroupID, true);
+		final String adminWatcherThreadName = "AdminWatcher" + TOPIC_NAME_ADMIN_QUEUE;
+		String threadNameWithPartition = adminWatcherThreadName + "_0";
+		if (threadManager.getStateOfUnmanagedThread(threadNameWithPartition) != ThreadStatus.ThreadState.RUNNING) {
+			results.add(scheduleConsumerThread(TOPIC_NAME_ADMIN_QUEUE, adminWatcherKafkaConsumerProps, adminWatcherThreadName, new AdminQueueProcessor()));
+			// consumer group so only one node receives events
+			String distributedAdminConsumerGroup = "DSAdminQueueConsumerGroupCommon";
+			Properties kafkaProps = getConsumerConfigProperties(distributedAdminConsumerGroup, true);
+			final String threadName = "DistributedAdminWatcher";
+			results.add(scheduleConsumerThread(TOPIC_NAME_ADMIN_QUEUE, kafkaProps, threadName, new ConfigChangeQueueProcessor()));
+		} else {
+			results.add(new ThreadStartupResult(adminWatcherThreadName) {
+				@Override
+				public boolean isNewThreadCreated() {
+					return false;
+				}
+
+				@Override
+				public boolean isReady() {
+					return true;
+				}
+			});
+		}
+		return results;
+	}
+
+	@Override
+	public void enqueueInAdminQueue(AdminMessage message) {
+		enqueueJSONMessage(TOPIC_NAME_ADMIN_QUEUE, message, ADMIN_QUEUE_KEY);
+	}
+
+	@Override
+	public void start() throws TimeoutException {
+		synchronized (startLock) {
+			if (queueConsumerNames == null) {
+				List<ThreadStartupResult> results = new ArrayList<>();
+				results.addAll(scheduleStatusQueueConsumers());
+				results.addAll(scheduleAdminQueueConsumers());
+				results.addAll(scheduleAllRuntimeConsumers());
+				results.addAll(scheduleNotificationListenerThreads());
+				List<String> consumerNames = new ArrayList<>();
+				for (ThreadStartupResult tsr : results) {
+					consumerNames.add(tsr.getThreadId());
+				}
+				queueConsumerNames = consumerNames;
+				this.threadManager.waitForThreadsToBeReady(THREAD_STARTUP_TIMEOUT_MS * results.size(), results);
+				logger.info("KafkaQueueManager successfully initialized");
+			}
+		}
+	}
+	
+	public void stop() {
+		synchronized (startLock) {
+			if (queueConsumerNames != null) {
+				threadManager.shutdownThreads(queueConsumerNames);
+				queueConsumerNames = null;
+			}
+		}
+	}
+
+	@Override
+	public MessagingStatus getMessagingStatus() {
+		KafkaStatus status = new KafkaStatus();
+		KafkaMonitor monitor = new ODFInternalFactory().create(KafkaMonitor.class);
+		status.setBrokers(monitor.getBrokers(zookeeperConnectString));
+
+		List<String> topics = new ArrayList<String>(Arrays.asList(KafkaQueueManager.TOPIC_NAME_ADMIN_QUEUE, KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE));
+		for (DiscoveryServiceProperties info : new ODFFactory().create().getDiscoveryServiceManager().getDiscoveryServicesProperties()) {
+			topics.add(KafkaQueueManager.SERVICE_TOPIC_PREFIX + info.getId());
+		}
+
+		List<KafkaTopicStatus> topicStatusList = new ArrayList<KafkaTopicStatus>();
+		for (String topic : topics) {
+			KafkaTopicStatus topicStatus = getTopicStatus(topic, monitor);
+			topicStatusList.add(topicStatus);
+		}
+		status.setTopicStatus(topicStatusList);
+		return status;
+	}
+
+	private KafkaTopicStatus getTopicStatus(String topic, KafkaMonitor monitor) {
+		KafkaTopicStatus topicStatus = new KafkaTopicStatus();
+		topicStatus.setTopic(topic);
+		topicStatus.setBrokerPartitionMessageInfo(monitor.getMessageCountForTopic(zookeeperConnectString, topic));
+
+		List<KafkaGroupOffsetInfo> offsetInfoList = new ArrayList<KafkaGroupOffsetInfo>();
+		List<String> consumerGroupsFromZookeeper = monitor.getConsumerGroups(zookeeperConnectString, topic);
+		for (String group : consumerGroupsFromZookeeper) {
+			KafkaGroupOffsetInfo offsetInfoContainer = new KafkaGroupOffsetInfo();
+			offsetInfoContainer.setGroupId(group);
+			List<PartitionOffsetInfo> offsetsForTopic = monitor.getOffsetsForTopic(zookeeperConnectString, group, topic);
+			for (PartitionOffsetInfo info : offsetsForTopic) {
+				// to reduce clutter, only if at least 1 partition has an offset > -1 (== any offset) for this consumer group, 
+				// it will be included in the result
+				if (info.getOffset() > -1) {
+					offsetInfoContainer.setOffsets(offsetsForTopic);
+					offsetInfoList.add(offsetInfoContainer);
+					break;
+				}
+			}
+		}
+		topicStatus.setConsumerGroupOffsetInfo(offsetInfoList);
+
+		topicStatus.setPartitionBrokersInfo(monitor.getPartitionInfoForTopic(zookeeperConnectString, topic));
+		return topicStatus;
+	}
+
+	private List<ThreadStartupResult> scheduleNotificationListenerThreads() {
+		NotificationManager nm = new ODFInternalFactory().create(NotificationManager.class);
+		List<NotificationListener> listeners = nm.getListeners();
+		List<ThreadStartupResult> result = new ArrayList<>();
+		if (listeners == null) {
+			return result;
+		}
+		final OpenDiscoveryFramework odf = new ODFFactory().create();
+		for (final NotificationListener listener : listeners) {
+			String topicName = listener.getTopicName();
+			String consumerGroupId = "ODFNotificationGroup" + topicName;
+			Properties kafkaConsumerProps = getConsumerConfigProperties(consumerGroupId, true);  
+			String threadName = "NotificationListenerThread" + topicName;
+			if (threadManager.getStateOfUnmanagedThread(threadName) != ThreadStatus.ThreadState.RUNNING) {
+				KafkaQueueConsumer consumer = new KafkaQueueConsumer(topicName, kafkaConsumerProps, new QueueMessageProcessor() {
+					
+					@Override
+					public void process(ExecutorService executorService, String msg, int partition, long msgOffset) {
+						try {
+							listener.onEvent(msg, odf);
+						} catch(Exception exc) {
+							String errorMsg = MessageFormat.format("Notification listsner ''{0}'' has thrown an exception. Ignoring it", listener.getName());
+							logger.log(Level.WARNING, errorMsg, exc);
+						}
+					}
+				});
+				ThreadStartupResult startupResult = threadManager.startUnmanagedThread(threadName, consumer);
+				result.add(startupResult);		
+			} else {
+				result.add(new ThreadStartupResult(threadName) {
+					@Override
+					public boolean isNewThreadCreated() {
+						return false;
+					}
+
+					@Override
+					public boolean isReady() {
+						return true;
+					}
+				});
+			}
+		}
+		return result;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaRuntimeConsumer.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaRuntimeConsumer.java b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaRuntimeConsumer.java
new file mode 100755
index 0000000..73d98e7
--- /dev/null
+++ b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaRuntimeConsumer.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.messaging.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.controlcenter.ODFRunnable;
+import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntime;
+
+/**
+ * This consumer is started for a certain runtime and starts a KafkaQueueConsumer if
+ * the runtime is available. 
+ * 
+ *
+ */
+public class KafkaRuntimeConsumer implements ODFRunnable {
+
+	Logger logger = Logger.getLogger(KafkaRuntimeConsumer.class.getName());
+
+	private ServiceRuntime runtime;
+	private boolean isShutdown = false;
+	private ExecutorService executorService = null;
+	private KafkaQueueConsumer kafkaQueueConsumer = null;
+
+	private String topic;
+	private Properties config;
+	private QueueMessageProcessor processor;
+
+	private KafkaQueueConsumer.ConsumptionCallback callback = new KafkaQueueConsumer.ConsumptionCallback() {
+		@Override
+		public boolean stopConsumption() {
+			return isShutdown || (runtime.getWaitTimeUntilAvailable() > 0);
+		}
+	};
+
+	public KafkaRuntimeConsumer(ServiceRuntime runtime, String topicName, Properties config, QueueMessageProcessor processor) {
+		this.runtime = runtime;
+		this.processor = processor;
+		this.topic = topicName;
+		this.config = config;
+	}
+
+	@Override
+	public void run() {
+		logger.log(Level.INFO, "Starting runtime consumer for topic ''{0}''", topic);
+		while (!isShutdown) {
+			long waitTime = runtime.getWaitTimeUntilAvailable();
+			if (waitTime <= 0) {
+				logger.log(Level.INFO, "Starting Kafka consumer for topic ''{0}''", topic);
+				kafkaQueueConsumer = new KafkaQueueConsumer(topic, config, processor, callback);
+				kafkaQueueConsumer.setExecutorService(executorService);
+				// run consumer synchronously
+				kafkaQueueConsumer.run();
+				logger.log(Level.INFO, "Kafka consumer for topic ''{0}'' is finished", topic);
+
+				// if we are here, this means that the consumer was cancelled
+				// either directly or (more likely) through the Consumption callback 
+				kafkaQueueConsumer = null;
+			} else {
+				try {
+					logger.log(Level.FINER, "Runtime ''{0}'' is not available, waiting for ''{1}''ms", new Object[]{runtime.getName(), waitTime });
+					Thread.sleep(waitTime);
+				} catch (InterruptedException e) {
+					throw new RuntimeException(e);
+				}
+			}
+		}
+		logger.log(Level.INFO, "Kafka runtime consumer for topic ''{0}'' has shut down", topic);
+	}
+
+	@Override
+	public void setExecutorService(ExecutorService executorService) {
+		this.executorService = executorService;
+	}
+
+	@Override
+	public void cancel() {
+		isShutdown = true;
+		if (kafkaQueueConsumer != null) {
+			kafkaQueueConsumer.cancel();
+		}
+	}
+
+	@Override
+	public boolean isReady() {
+		return true;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/MessageSearchConsumer.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/MessageSearchConsumer.java b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/MessageSearchConsumer.java
new file mode 100755
index 0000000..9c08f3a
--- /dev/null
+++ b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/MessageSearchConsumer.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.messaging.kafka;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.controlcenter.ODFRunnable;
+import org.apache.atlas.odf.api.engine.PartitionOffsetInfo;
+
+public class MessageSearchConsumer implements ODFRunnable {
+	private static final long POLLING_DURATION_MS = 100;
+	private static final int MAX_POLL_COUNT = 5;
+
+	private Logger logger = Logger.getLogger(MessageSearchConsumer.class.getName());
+	private SearchCompletedCallback searchCompletedCallback;
+	private List<String> searchStrings;
+	protected String topic;
+	private KafkaConsumer<String, String> kafkaConsumer;
+	private boolean shutdown;
+	private boolean ready = false;
+	private List<PartitionOffsetInfo> maxOffsetsForTopic = new ArrayList<PartitionOffsetInfo>();
+
+
+	public MessageSearchConsumer(String topic, SearchCompletedCallback completitionCallback, List<String> searchStrings) {
+		setTopic(topic);
+		setSearchStrings(searchStrings);
+		setCompletitionCallback(completitionCallback);
+	}
+
+	public MessageSearchConsumer() {
+	}
+
+	protected List<PartitionOffsetInfo> retrieveTopicOffsets() {
+		List<PartitionOffsetInfo> offsetsForTopic = new ArrayList<PartitionOffsetInfo>();
+		String zookeeperConnect = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
+
+		if (zookeeperConnect != null) {
+			final KafkaMonitor create = new ODFInternalFactory().create(KafkaMonitor.class);
+			for (int part : create.getPartitionsForTopic(zookeeperConnect, this.topic)) {
+				offsetsForTopic.add(create.getOffsetsOfLastMessagesForTopic(zookeeperConnect, this.topic, part));
+			}
+		}
+		return offsetsForTopic;
+	}
+
+	public void setTopic(String topic) {
+		this.topic = topic;
+	}
+
+	public void setSearchStrings(List<String> searchStrings) {
+		this.searchStrings = searchStrings;
+	}
+
+	public void setCompletitionCallback(SearchCompletedCallback completitionCallback) {
+		this.searchCompletedCallback = completitionCallback;
+	}
+
+	protected Properties getKafkaConsumerProperties() {
+		Properties consumerProperties = new ODFFactory().create().getSettingsManager().getKafkaConsumerProperties();
+		consumerProperties.put("group.id", UUID.randomUUID().toString() + "_searchConsumer");
+		final String zookeeperConnect = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
+		consumerProperties.put("zookeeper.connect", zookeeperConnect);
+		consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+		consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+		consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+		final Iterator<String> brokers = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperConnect).iterator();
+		StringBuilder brokersString = new StringBuilder();
+		while (brokers.hasNext()) {
+			brokersString.append(brokers.next());
+			if (brokers.hasNext()) {
+				brokersString.append(",");
+			}
+		}
+		consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersString.toString());
+		consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+		return consumerProperties;
+	}
+
+	@Override
+	public void run() {
+		this.maxOffsetsForTopic = retrieveTopicOffsets();
+		final String logPrefix = "Consumer for topic " + topic + ": ";
+		try {
+
+			Map<Integer, Boolean> maxOffsetReachedMap = new HashMap<Integer, Boolean>();
+			if (maxOffsetsForTopic.isEmpty()) {
+				logger.info("No offsets found for topic " + this.topic + ", therefore no matching messages can be found");
+				if (searchCompletedCallback != null) {
+					searchCompletedCallback.onDoneSearching(new HashMap<String, PartitionOffsetInfo>());
+					return;
+				}
+			}
+			for (PartitionOffsetInfo info : maxOffsetsForTopic) {
+				//if the max offset is -1, no message exists on the partition
+				if (info.getOffset() > -1) {
+					maxOffsetReachedMap.put(info.getPartitionId(), false);
+				}
+			}
+
+			Map<String, PartitionOffsetInfo> resultMap = new HashMap<String, PartitionOffsetInfo>();
+
+			Properties consumerProperties = getKafkaConsumerProperties();
+
+			if (this.kafkaConsumer == null) {
+				logger.fine(logPrefix + " create new consumer for topic " + topic);
+				try {
+					this.kafkaConsumer = new KafkaConsumer<String, String>(consumerProperties);
+					//In order to prevent other consumers from getting assigned this partition during a rebalance, the partition(s) MUST be assigned manually (not using auto assign because of subscribe())
+					kafkaConsumer.subscribe(Arrays.asList(topic));
+				} catch (ZkTimeoutException zkte) {
+					String zkHosts = consumerProperties.getProperty("zookeeper.connect");
+					logger.log(Level.SEVERE, logPrefix + " Could not connect to the Zookeeper instance at ''{0}''. Please ensure that Zookeeper is running", zkHosts);
+					throw zkte;
+				}
+			}
+			logger.log(Level.INFO, logPrefix + " Consumer " + "''{1}'' is now listening on ODF queue ''{0}'' with configuration {2}",
+					new Object[] { topic, kafkaConsumer, consumerProperties });
+
+			int pollCount = 0;
+			while (!Thread.interrupted() && pollCount < MAX_POLL_COUNT && !shutdown && kafkaConsumer != null) {
+				logger.info("searching ...");
+				pollCount++;
+				ConsumerRecords<String, String> records = kafkaConsumer.poll(POLLING_DURATION_MS);
+				ready = true;
+				final Iterator<ConsumerRecord<String, String>> polledRecords = records.records(topic).iterator();
+				
+				while (polledRecords.hasNext() && !shutdown) {
+					final ConsumerRecord<String, String> next = polledRecords.next();
+					for (String s : searchStrings) {
+						if ((next.key() != null && next.key().equals(s)) || (next.value() != null && next.value().contains(s))) {
+							final PartitionOffsetInfo position = new PartitionOffsetInfo();
+							position.setOffset(next.offset());
+							position.setPartitionId(next.partition());
+							resultMap.put(s, position);
+						}
+					}
+
+					if (next.offset() == maxOffsetsForTopic.get(next.partition()).getOffset()) {
+						maxOffsetReachedMap.put(next.partition(), true);
+					}
+
+					boolean allCompleted = true;
+					for (Entry<Integer, Boolean> entry : maxOffsetReachedMap.entrySet()) {
+						if (!entry.getValue()) {
+							allCompleted = false;
+							break;
+						}
+					}
+
+					if (allCompleted) {
+						logger.info("Done searching all messages");
+						if (searchCompletedCallback != null) {
+							searchCompletedCallback.onDoneSearching(resultMap);
+							return;
+						}
+						shutdown = true;
+					}
+				}
+			}
+		} catch (Exception exc) {
+			String msg = MessageFormat.format(" Caught exception on queue ''{0}''", topic);
+			logger.log(Level.WARNING, logPrefix + msg, exc);
+		} finally {
+			if (kafkaConsumer != null) {
+				logger.log(Level.FINE, logPrefix + "Closing consumer " + " on topic ''{0}''", topic);
+				kafkaConsumer.close();
+				logger.log(Level.FINE, logPrefix + "Closed consumer " + " on topic ''{0}''", topic);
+				kafkaConsumer = null;
+			}
+		}
+		logger.log(Level.FINE, logPrefix + "Finished consumer on topic ''{0}''", topic);
+	}
+
+	@Override
+	public void setExecutorService(ExecutorService service) {
+
+	}
+
+	@Override
+	public void cancel() {
+		this.shutdown = true;
+	}
+
+	@Override
+	public boolean isReady() {
+		return ready;
+	}
+
+	public interface SearchCompletedCallback {
+		void onDoneSearching(Map<String, PartitionOffsetInfo> msgPositionMap);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/main/resources/org/apache/atlas/odf/odf-implementation.properties b/odf/odf-messaging/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
new file mode 100755
index 0000000..95c1f71
--- /dev/null
+++ b/odf/odf-messaging/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
@@ -0,0 +1,14 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+DiscoveryServiceQueueManager=org.apache.atlas.odf.core.messaging.kafka.KafkaQueueManager

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueConsumerExceptionTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueConsumerExceptionTest.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueConsumerExceptionTest.java
new file mode 100755
index 0000000..396193f
--- /dev/null
+++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueConsumerExceptionTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.engine.ThreadStatus.ThreadState;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaMonitor;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueManager;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+
+import kafka.admin.AdminUtils;
+import kafka.common.TopicExistsException;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+public class KafkaQueueConsumerExceptionTest extends ODFTestcase {
+	static Logger logger = ODFTestLogger.get();
+	static final String topicName = "my_dummy_test_topic";
+	static String zookeeperHost = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
+
+	@BeforeClass
+	public static void setupTopic() {
+		ZkClient zkClient = null;
+		try {
+			zkClient = new ZkClient(zookeeperHost, 5000, 5000, ZKStringSerializer$.MODULE$);
+			logger.log(Level.FINEST, "Checking if topic ''{0}'' already exists", topicName);
+			// using partition size 1 and replication size 1, no special
+			// per-topic config needed
+			logger.log(Level.FINE, "Topic ''{0}'' does not exist, creating it", topicName);
+			//FIXME zkUtils isSecure parameter? Only with SSL! --> parse zkhosts?
+			AdminUtils.createTopic(new ZkUtils(zkClient, new ZkConnection(zookeeperHost), false), topicName, 1, 1, new Properties(), KafkaQueueManager.DEFAULT_RACK_AWARE_MODE);
+			logger.log(Level.FINE, "Topic ''{0}'' created", topicName);
+		} catch (TopicExistsException ex) {
+			logger.log(Level.FINE, "Topic ''{0}'' already exists.", topicName);
+		} catch (ZkTimeoutException zkte) {
+			logger.log(Level.SEVERE, "Could not connect to the Zookeeper instance at ''{0}''. Please ensure that Zookeeper is running", zookeeperHost);
+		} finally {
+			if (zkClient != null) {
+				zkClient.close();
+			}
+		}
+	}
+
+	@Test
+	public void testExceptionAndRetryDuringProcessing() throws InterruptedException, ExecutionException, TimeoutException {
+		final ODFInternalFactory odfFactory = new ODFInternalFactory();
+		final String groupId = "retrying-exception-dummy-consumer";
+		Properties kafkaConsumerProperties = new KafkaQueueManager().getConsumerConfigProperties(groupId, true);
+		kafkaConsumerProperties.put("group.id", groupId);
+		final List<String> consumedMsgs1 = new ArrayList<String>();
+		KafkaQueueConsumer cnsmr = new KafkaQueueConsumer(topicName, kafkaConsumerProperties, new QueueMessageProcessor() {
+
+			@Override
+			public void process(ExecutorService executorService, String msg, int partition, long offset) {
+				consumedMsgs1.add(msg);
+				logger.info("retry_consumer process " + msg + " throw exception and try again");
+				throw new RuntimeException("Oops!");
+			}
+		});
+
+		final ThreadManager threadManager = odfFactory.create(ThreadManager.class);
+		final String consumerThread = "TEST_CONSUMER_RETRY_RUNNING";
+		threadManager.waitForThreadsToBeReady(10000, Arrays.asList(threadManager.startUnmanagedThread(consumerThread, cnsmr)));
+
+		sendMsg("TEST_MSG");
+		sendMsg("TEST_MSG2");
+
+		Thread.sleep(2000);
+
+		Assert.assertEquals(2 * KafkaQueueConsumer.MAX_PROCESSING_EXCEPTIONS, consumedMsgs1.size());
+
+		final ThreadState stateOfUnmanagedThread = threadManager.getStateOfUnmanagedThread(consumerThread);
+		Assert.assertEquals(ThreadState.RUNNING, stateOfUnmanagedThread);
+	}
+
+	void sendMsg(String msg) throws InterruptedException, ExecutionException, TimeoutException {
+		SettingsManager odfConfig = new ODFFactory().create().getSettingsManager();
+
+		Properties props = odfConfig.getKafkaProducerProperties();
+		final Iterator<String> brokers = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperHost).iterator();
+		StringBuilder brokersString = new StringBuilder();
+		while (brokers.hasNext()) {
+			brokersString.append(brokers.next());
+			if (brokers.hasNext()) {
+				brokersString.append(",");
+			}
+		}
+		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersString.toString());
+
+		final KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
+		ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topicName, UUID.randomUUID().toString(), msg);
+		producer.send(producerRecord).get(3000, TimeUnit.MILLISECONDS);
+		producer.close();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueManagerTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueManagerTest.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueManagerTest.java
new file mode 100755
index 0000000..cff538c
--- /dev/null
+++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueManagerTest.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.settings.MessagingConfiguration;
+import org.apache.atlas.odf.api.settings.ODFSettings;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueConsumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.wink.json4j.JSONException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.engine.ThreadStatus.ThreadState;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore;
+import org.apache.atlas.odf.core.controlcenter.DefaultThreadManager;
+import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
+import org.apache.atlas.odf.core.controlcenter.StatusQueueEntry;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager.ThreadStartupResult;
+import org.apache.atlas.odf.core.controlcenter.TrackerUtil;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaMonitor;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaProducerManager;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueManager;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class KafkaQueueManagerTest extends ODFTestBase {
+
+	private static Long origRetention;
+	Logger logger = ODFTestLogger.get();
+	String zookeeperConnectString = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
+
+	@BeforeClass
+	public static void setupTrackerRetention() throws ValidationException {
+		SettingsManager settingsManager = new ODFFactory().create().getSettingsManager();
+		//SETUP RETENTION TO KEEP TRACKERS!!!
+		final MessagingConfiguration messagingConfiguration = settingsManager.getODFSettings().getMessagingConfiguration();
+		origRetention = messagingConfiguration.getAnalysisRequestRetentionMs();
+		messagingConfiguration.setAnalysisRequestRetentionMs(120000000l);
+
+		ODFTestLogger.get().info("Set request retention to " + settingsManager.getODFSettings().getMessagingConfiguration().getAnalysisRequestRetentionMs());
+	}
+
+	@AfterClass
+	public static void cleanupTrackerRetention() throws ValidationException {
+		SettingsManager settingsManager = new ODFFactory().create().getSettingsManager();
+		ODFSettings settings = settingsManager.getODFSettings();
+		settings.getMessagingConfiguration().setAnalysisRequestRetentionMs(origRetention);
+		settingsManager.updateODFSettings(settings);
+	}
+
+	@Test
+	public void testStatusQueue() throws Exception {
+		KafkaQueueManager kqm = new KafkaQueueManager();
+
+		logger.info("Queue manager created");
+		AnalysisRequestTracker tracker = JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, "org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json", null);
+
+		long before = System.currentTimeMillis();
+		tracker.setLastModified(before);
+		int maxEntries = 10;
+		for (int i = 0; i < maxEntries; i++) {
+			tracker.getRequest().setId("id" + i);
+			StatusQueueEntry sqe = new StatusQueueEntry();
+			sqe.setAnalysisRequestTracker(tracker);
+			kqm.enqueueInStatusQueue(sqe);
+
+			//			System.out.println("tracker "+i+" enqueued in status queue");
+		}
+		long after = System.currentTimeMillis();
+		logger.info("Time for enqueueing " + maxEntries + " objects: " + (after - before) + ", " + ((after - before) / maxEntries) + "ms per object");
+		Thread.sleep(100 * maxEntries);
+
+		AnalysisRequestTrackerStore store = new DefaultStatusQueueStore();
+
+		for (int i = 0; i < maxEntries; i++) {
+			logger.info("Querying status " + i);
+			AnalysisRequestTracker queriedTracker = store.query("id" + i);
+			Assert.assertNotNull(queriedTracker);
+			Assert.assertEquals(STATUS.FINISHED, queriedTracker.getStatus());
+		}
+
+		//	Thread.sleep(5000);
+		//	Assert.fail("you fail");
+		logger.info("Test testEnqueueStatusQueue finished");
+	}
+
+	/**
+	 * This test creates a tracker, puts it on the status queue, kills the service consumer and creates a new dummy consumer to put the offset of the service consumer behind the new tracker
+	 * Then the status consumer is shut down and its offset is reset in order to make it consume from the start again and thereby cleaning up stuck processes
+	 * Then kafka queue manager is re-initialized, causing all consumers to come up and triggering the cleanup process
+	 */
+	@Test
+	@Ignore("Adjust once ServiceRuntimes are fully implemented")
+	public void testStuckRequestCleanup() throws JSONException, InterruptedException, ExecutionException, TimeoutException {
+		final AnalysisRequestTracker tracker = JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, "org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json",
+				null);
+		tracker.setStatus(STATUS.IN_DISCOVERY_SERVICE_QUEUE);
+		tracker.setNextDiscoveryServiceRequest(0);
+		tracker.setLastModified(System.currentTimeMillis());
+		final String newTrackerId = "KAFKA_QUEUE_MANAGER_09_TEST" + UUID.randomUUID().toString();
+		tracker.getRequest().setId(newTrackerId);
+		DiscoveryServiceRequest dsRequest = TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker);
+		final DiscoveryServiceProperties discoveryServiceRegistrationInfo = new ODFFactory().create().getDiscoveryServiceManager().getDiscoveryServicesProperties()
+				.get(0);
+		dsRequest.setDiscoveryServiceId(discoveryServiceRegistrationInfo.getId());
+		String dsID = dsRequest.getDiscoveryServiceId();
+		String topicName = KafkaQueueManager.SERVICE_TOPIC_PREFIX + dsID;
+		//Add tracker to queue, set offset behind request so that it should be cleanup
+
+		String consumerGroupId = "odf-topic-" + dsID + "_group";
+		String threadName = "Dummy_DiscoveryServiceQueueConsumer" + topicName;
+
+		final List<Throwable> multiThreadErrors = new ArrayList<Throwable>();
+		final DefaultThreadManager tm = new DefaultThreadManager();
+		logger.info("shutdown old test 09 consumer and replace with fake doing nothing");
+		for (int no = 0; no < discoveryServiceRegistrationInfo.getParallelismCount(); no++) {
+			tm.shutdownThreads(Collections.singletonList("DiscoveryServiceQueueConsumer" + topicName + "_" + no));
+		}
+		Properties kafkaConsumerProps = getKafkaConsumerConfigProperties(consumerGroupId);
+
+		final long[] producedMsgOffset = new long[1];
+
+		final CountDownLatch msgProcessingLatch = new CountDownLatch(1);
+		ThreadStartupResult created = tm.startUnmanagedThread(threadName, new KafkaQueueConsumer(topicName, kafkaConsumerProps, new QueueMessageProcessor() {
+
+			@Override
+			public void process(ExecutorService executorService, String msg, int partition, long msgOffset) {
+				logger.info("Dequeue without processing " + msgOffset);
+				if (msgOffset == producedMsgOffset[0]) {
+					try {
+						msgProcessingLatch.countDown();
+					} catch (Exception e) {
+						msgProcessingLatch.countDown();
+						multiThreadErrors.add(e);
+					}
+				}
+			}
+
+		}));
+
+		tm.waitForThreadsToBeReady(30000, Arrays.asList(created));
+
+		String key = tracker.getRequest().getId();
+		String value = JSONUtils.toJSON(tracker);
+
+		new DefaultStatusQueueStore().store(tracker);
+
+		KafkaMonitor kafkaMonitor = new ODFInternalFactory().create(KafkaMonitor.class);
+		List<String> origQueueConsumers = kafkaMonitor.getConsumerGroups(zookeeperConnectString, KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE);
+		logger.info("Found status consumers: " + origQueueConsumers.toString() + ", shutting down StatusWatcher");
+
+		//kill status queue watcher so that it is restarted when queue manager is initialized and detects stuck requests
+		tm.shutdownThreads(Collections.singletonList("StatusWatcher" + KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE + "_0"));
+
+		int maxWaitForConsumerDeath = 60;
+		while (tm.getStateOfUnmanagedThread("StatusWatcher" + KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE + "_0") != ThreadState.NON_EXISTENT
+				|| tm.getStateOfUnmanagedThread("StatusWatcher" + KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE + "_0") != ThreadState.FINISHED && maxWaitForConsumerDeath > 0) {
+			maxWaitForConsumerDeath--;
+			Thread.sleep(500);
+		}
+
+		logger.info("Only 1 consumer left? " + maxWaitForConsumerDeath + " : " + tm.getStateOfUnmanagedThread("StatusWatcher" + KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE + "_0"));
+		logger.info(" set offset for status consumer to beginning so that it consumes from when restarting");
+		final int offset = 1000000;
+		for (String statusConsumerGroup : origQueueConsumers) {
+			if (statusConsumerGroup.contains("DSStatusWatcherConsumerGroup")) {
+				boolean success = false;
+				int retryCount = 0;
+				final int maxOffsetRetry = 20;
+				while (!success && retryCount < maxOffsetRetry) {
+					success = kafkaMonitor.setOffset(zookeeperConnectString, statusConsumerGroup, KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE, 0, offset);
+					retryCount++;
+					Thread.sleep(500);
+				}
+
+				Assert.assertNotEquals(retryCount, maxOffsetRetry);
+				Assert.assertTrue(success);
+			}
+		}
+
+		new ODFInternalFactory().create(KafkaProducerManager.class).sendMsg(topicName, key, value, new Callback() {
+
+			@Override
+			public void onCompletion(RecordMetadata metadata, Exception exception) {
+				producedMsgOffset[0] = metadata.offset();
+			}
+		});
+
+		final boolean await = msgProcessingLatch.await(240, TimeUnit.SECONDS);
+		Assert.assertTrue(await);
+		if (await) {
+			logger.info("run after message consumption...");
+			AnalysisRequestTrackerStore store = new ODFInternalFactory().create(AnalysisRequestTrackerStore.class);
+			AnalysisRequestTracker storeTracker = store.query(tracker.getRequest().getId());
+			Assert.assertEquals(tracker.getRequest().getId(), storeTracker.getRequest().getId());
+			Assert.assertEquals(STATUS.IN_DISCOVERY_SERVICE_QUEUE, storeTracker.getStatus());
+
+			//start odf and cleanup here...
+			logger.info("shutdown all threads and restart ODF");
+			tm.shutdownAllUnmanagedThreads();
+
+			int threadKillRetry = 0;
+			while (tm.getNumberOfRunningThreads() > 0 && threadKillRetry < 20) {
+				Thread.sleep(500);
+				threadKillRetry++;
+			}
+
+			logger.info("All threads down, restart ODF " + threadKillRetry);
+
+			// Initialize analysis manager
+			new ODFFactory().create().getAnalysisManager();
+
+			kafkaMonitor = new ODFInternalFactory().create(KafkaMonitor.class);
+			origQueueConsumers = kafkaMonitor.getConsumerGroups(zookeeperConnectString, KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE);
+			int healthRetrieveRetry = 0;
+			//wait for max of 40 secs for status consumer to come up. If it is, we can continue because ODF is restarted successfully
+			while (origQueueConsumers.isEmpty() && healthRetrieveRetry < 240) {
+				healthRetrieveRetry++;
+				Thread.sleep(500);
+				origQueueConsumers = kafkaMonitor.getConsumerGroups(zookeeperConnectString, KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE);
+			}
+			Assert.assertNotEquals(healthRetrieveRetry, 240);
+
+			logger.info("initialized, wait for cleanup ... " + healthRetrieveRetry);
+			Thread.sleep(5000);
+			logger.info("Found health consumers: " + origQueueConsumers.toString());
+			logger.info("hopefully cleaned up ...");
+			AnalysisRequestTracker storedTracker = store.query(tracker.getRequest().getId());
+			Assert.assertEquals(STATUS.ERROR, storedTracker.getStatus());
+			logger.info("DONE CLEANING UP, ALL FINE");
+		}
+
+		Assert.assertEquals(0, multiThreadErrors.size());
+	}
+
+	public Properties getKafkaConsumerConfigProperties(String consumerGroupID) {
+		SettingsManager odfConfig = new ODFFactory().create().getSettingsManager();
+		Properties kafkaConsumerProps = odfConfig.getKafkaConsumerProperties();
+		kafkaConsumerProps.put("group.id", consumerGroupID);
+		if (zookeeperConnectString != null) {
+			kafkaConsumerProps.put("zookeeper.connect", zookeeperConnectString);
+		}
+
+		kafkaConsumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+		kafkaConsumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+		kafkaConsumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+		StringBuilder bld = new StringBuilder();
+		final Iterator<String> iterator = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperConnectString).iterator();
+		while (iterator.hasNext()) {
+			bld.append(iterator.next());
+			if (iterator.hasNext()) {
+				bld.append(",");
+			}
+		}
+		kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bld.toString());
+		kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+		return kafkaConsumerProps;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MessageSearchConsumerTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MessageSearchConsumerTest.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MessageSearchConsumerTest.java
new file mode 100755
index 0000000..35b09e2
--- /dev/null
+++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MessageSearchConsumerTest.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.atlas.odf.core.messaging.kafka.MessageSearchConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.api.engine.PartitionOffsetInfo;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaMonitor;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueManager;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+
+import kafka.admin.AdminUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+public class MessageSearchConsumerTest extends ODFTestBase {
+	private static final String TEST_SEARCH_STRING = "TEST_STRING_" + UUID.randomUUID().toString();
+	private static final String TEST_SEARCH_FAILURE_STRING = "TEST_FAILURE_STRING";
+	static Logger logger = ODFTestLogger.get();
+	final static String topicName = "MessageSearchConsumerTest" + UUID.randomUUID().toString();
+	private static final int PERFORMANCE_MSG_COUNT = 1000;
+	static String zookeeperHost = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
+	private KafkaProducer<String, String> producer;
+
+	@BeforeClass
+	public static void createTopc() {
+		ZkClient zkClient = new ZkClient(zookeeperHost, 5000, 5000, ZKStringSerializer$.MODULE$);
+		ZkUtils utils = new ZkUtils(zkClient, new ZkConnection(zookeeperHost), false);
+		if (!AdminUtils.topicExists(utils, topicName)) {
+			AdminUtils.createTopic(utils, topicName, 2, 1, new Properties(), KafkaQueueManager.DEFAULT_RACK_AWARE_MODE);
+		}
+	}
+
+	@Test
+	public void testMsgSearchPerformance() throws InterruptedException, ExecutionException, TimeoutException {
+		logger.info("Producing msgs");
+		for (int no = 0; no < PERFORMANCE_MSG_COUNT; no++) {
+			sendMsg("DUMMY_MSG" + no);
+		}
+		sendMsg(TEST_SEARCH_STRING);
+		logger.info("Done producing ...");
+		Thread.sleep(200);
+
+		final ThreadManager threadManager = new ODFInternalFactory().create(ThreadManager.class);
+		final CountDownLatch searchLatch = new CountDownLatch(1);
+		threadManager.startUnmanagedThread(UUID.randomUUID().toString() + "_searchThread", new MessageSearchConsumer(topicName, new MessageSearchConsumer.SearchCompletedCallback() {
+
+			@Override
+			public void onDoneSearching(Map<String, PartitionOffsetInfo> msgPositionMap) {
+				logger.info("Done searching " + msgPositionMap.get(TEST_SEARCH_STRING).getOffset());
+				Assert.assertTrue(msgPositionMap.get(TEST_SEARCH_STRING).getOffset() > -1);
+				searchLatch.countDown();
+			}
+		}, Arrays.asList(TEST_SEARCH_STRING)));
+
+		boolean await = searchLatch.await(5, TimeUnit.SECONDS);
+		if (await) {
+			logger.info("Messages searched in time");
+		} else {
+			logger.warning("Couldnt finish search in time");
+		}
+
+		final CountDownLatch failureSearchLatch = new CountDownLatch(1);
+		threadManager.startUnmanagedThread(UUID.randomUUID().toString() + "_searchThread", new MessageSearchConsumer(topicName, new MessageSearchConsumer.SearchCompletedCallback() {
+
+			@Override
+			public void onDoneSearching(Map<String, PartitionOffsetInfo> msgPositionMap) {
+				logger.info("Done searching " + msgPositionMap.toString());
+				Assert.assertFalse(msgPositionMap.containsKey(TEST_SEARCH_FAILURE_STRING));
+				failureSearchLatch.countDown();
+			}
+		}, Arrays.asList(TEST_SEARCH_FAILURE_STRING)));
+
+		await = searchLatch.await(5, TimeUnit.SECONDS);
+		if (await) {
+			logger.info("Messages searched in time");
+		} else {
+			logger.warning("Couldnt finish search in time");
+		}
+	}
+
+	@Test
+	public void testMsgSearchSuccessAndFailure() throws InterruptedException, ExecutionException, TimeoutException {
+		sendMsg(TEST_SEARCH_STRING);
+
+		Thread.sleep(200);
+
+		final ThreadManager threadManager = new ODFInternalFactory().create(ThreadManager.class);
+		final CountDownLatch searchLatch = new CountDownLatch(1);
+		threadManager.startUnmanagedThread(UUID.randomUUID().toString() + "_searchThread", new MessageSearchConsumer(topicName, new MessageSearchConsumer.SearchCompletedCallback() {
+
+			@Override
+			public void onDoneSearching(Map<String, PartitionOffsetInfo> msgPositionMap) {
+				logger.info("Done searching " + msgPositionMap.get(TEST_SEARCH_STRING).getOffset());
+				Assert.assertTrue(msgPositionMap.get(TEST_SEARCH_STRING).getOffset() > -1);
+				searchLatch.countDown();
+			}
+		}, Arrays.asList(TEST_SEARCH_STRING)));
+
+		boolean await = searchLatch.await(5, TimeUnit.SECONDS);
+		if (await) {
+			logger.info("Messages searched in time");
+		} else {
+			logger.warning("Couldnt finish search in time");
+		}
+
+		final CountDownLatch failureSearchLatch = new CountDownLatch(1);
+		threadManager.startUnmanagedThread(UUID.randomUUID().toString() + "_searchThread", new MessageSearchConsumer(topicName, new MessageSearchConsumer.SearchCompletedCallback() {
+
+			@Override
+			public void onDoneSearching(Map<String, PartitionOffsetInfo> msgPositionMap) {
+				logger.info("Done searching " + msgPositionMap);
+				Assert.assertFalse(msgPositionMap.containsKey(TEST_SEARCH_FAILURE_STRING));
+				failureSearchLatch.countDown();
+			}
+		}, Arrays.asList(TEST_SEARCH_FAILURE_STRING)));
+
+		await = searchLatch.await(5, TimeUnit.SECONDS);
+		if (await) {
+			logger.info("Messages searched in time");
+		} else {
+			logger.warning("Couldnt finish search in time");
+		}
+	}
+
+	void sendMsg(String msg) throws InterruptedException, ExecutionException, TimeoutException {
+		final KafkaProducer<String, String> producer = getProducer();
+		ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topicName, UUID.randomUUID().toString(), msg);
+		producer.send(producerRecord).get(15000, TimeUnit.MILLISECONDS);
+	}
+
+	private KafkaProducer<String, String> getProducer() {
+		if (this.producer == null) {
+			SettingsManager odfConfig = new ODFFactory().create().getSettingsManager();
+			Properties props = odfConfig.getKafkaProducerProperties();
+			final Iterator<String> brokers = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperHost).iterator();
+			StringBuilder brokersString = new StringBuilder();
+			while (brokers.hasNext()) {
+				brokersString.append(brokers.next());
+				if (brokers.hasNext()) {
+					brokersString.append(",");
+				}
+			}
+			props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersString.toString());
+			producer = new KafkaProducer<String, String>(props);
+		}
+		return producer;
+	}
+
+	@After
+	public void closeProducer() {
+		if (getProducer() != null) {
+			getProducer().close();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MultiPartitionConsumerTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MultiPartitionConsumerTest.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MultiPartitionConsumerTest.java
new file mode 100755
index 0000000..f97dd4e
--- /dev/null
+++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MultiPartitionConsumerTest.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.atlas.odf.api.engine.ThreadStatus;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInitializer;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaMonitor;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueConsumer;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueManager;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager.ThreadStartupResult;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+
+import kafka.admin.AdminUtils;
+import kafka.common.TopicExistsException;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+public class MultiPartitionConsumerTest extends ODFTestcase {
+	static Logger logger = ODFTestLogger.get();
+	final static String topicName = "my_dummy_test_topic" + UUID.randomUUID().toString();
+	static String zookeeperHost = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
+	static final int PARTITION_COUNT = 3;
+	private static final int MSG_PER_PARTITION = 5;
+	private final ThreadManager threadManager = new ODFInternalFactory().create(ThreadManager.class);
+
+	@BeforeClass
+	public static void setupTopic() {
+		ZkClient zkClient = null;
+		try {
+			zkClient = new ZkClient(zookeeperHost, 5000, 5000, ZKStringSerializer$.MODULE$);
+			logger.log(Level.FINEST, "Checking if topic ''{0}'' already exists", topicName);
+			// using partition size 1 and replication size 1, no special
+			// per-topic config needed
+			logger.log(Level.FINE, "Topic ''{0}'' does not exist, creating it", topicName);
+			//FIXME zkUtils isSecure parameter? Only with SSL! --> parse zkhosts?
+			AdminUtils.createTopic(new ZkUtils(zkClient, new ZkConnection(zookeeperHost), false), topicName, PARTITION_COUNT, 1, new Properties(), KafkaQueueManager.DEFAULT_RACK_AWARE_MODE);
+			logger.log(Level.FINE, "Topic ''{0}'' created", topicName);
+		} catch (TopicExistsException ex) {
+			logger.log(Level.FINE, "Topic ''{0}'' already exists.", topicName);
+		} catch (ZkTimeoutException zkte) {
+			logger.log(Level.SEVERE, "Could not connect to the Zookeeper instance at ''{0}''. Please ensure that Zookeeper is running", zookeeperHost);
+		} finally {
+			if (zkClient != null) {
+				zkClient.close();
+			}
+		}
+	}
+
+	@After
+	public void cleanupConsumers() {
+		logger.info("Cleaning up consumers...");
+		logger.info("----------------------------------  Stopping ODF...");
+		ODFInitializer.stop();
+		logger.info("----------------------------------  Starting ODF...");
+		ODFInitializer.start();
+		logger.info("----------------------------------  ODF started.");
+	}
+
+	@Test
+	public void testMultiPartitionDelayedConsumption() throws InterruptedException, ExecutionException {
+		Properties kafkaConsumerProperties = getConsumerProps();
+		final List<String> consumedMsgs = new ArrayList<String>();
+		List<ThreadStartupResult> startupList = new ArrayList<ThreadStartupResult>();
+
+		final String threadPrefix = "TEST_CONSUMER_RETRY_RUNNING_";
+		final int processingDelay = 2000;
+		for (int no = 0; no < PARTITION_COUNT; no++) {
+			final int currentThread = no;
+			final QueueMessageProcessor requestConsumer = new QueueMessageProcessor() {
+
+				@Override
+				public void process(ExecutorService executorService, String msg, int partition, long msgOffset) {
+					try {
+						Thread.sleep(processingDelay);
+					} catch (InterruptedException e) {
+						// TODO Auto-generated catch block
+						e.printStackTrace();
+					}
+					consumedMsgs.add(msg);
+					logger.info("process " + msg + " in thread " + currentThread);
+				}
+			};
+
+			KafkaQueueConsumer cnsmr = new KafkaQueueConsumer(topicName, kafkaConsumerProperties, requestConsumer);
+
+			final String consumerThread = threadPrefix + no;
+			final ThreadStartupResult startUnmanagedThread = threadManager.startUnmanagedThread(consumerThread, cnsmr);
+			startupList.add(startUnmanagedThread);
+		}
+		try {
+			threadManager.waitForThreadsToBeReady(30000, startupList);
+			for (int no = 0; no < PARTITION_COUNT; no++) {
+				for (int msgNo = 0; msgNo < MSG_PER_PARTITION; msgNo++) {
+					sendMsg("Partition " + no + " msg " + msgNo);
+				}
+			}
+
+			int totalWait = 0;
+			while (totalWait < PARTITION_COUNT * MSG_PER_PARTITION * processingDelay + 10000 && consumedMsgs.size() < PARTITION_COUNT * MSG_PER_PARTITION) {
+				Thread.sleep(2000);
+				totalWait += 2000;
+			}
+
+			logger.info("Done with all messages after " + totalWait);
+
+			Assert.assertEquals(PARTITION_COUNT * MSG_PER_PARTITION, consumedMsgs.size());
+
+			for (int no = 0; no < PARTITION_COUNT; no++) {
+				final ThreadStatus.ThreadState stateOfUnmanagedThread = threadManager.getStateOfUnmanagedThread(threadPrefix + no);
+				Assert.assertEquals(ThreadStatus.ThreadState.RUNNING, stateOfUnmanagedThread);
+			}
+		} catch (TimeoutException e) {
+			Assert.fail("Consumer could not be started on time");
+		}
+	}
+
+	@Test
+	public void testMultiPartitionConsumption() throws InterruptedException, ExecutionException {
+		Properties kafkaConsumerProperties = getConsumerProps();
+		final List<String> consumedMsgs = new ArrayList<String>();
+		List<ThreadStartupResult> startupList = new ArrayList<ThreadStartupResult>();
+
+		final String threadPrefix = "TEST_CONSUMER_RETRY_RUNNING_";
+		for (int no = 0; no < PARTITION_COUNT; no++) {
+			final int currentThread = no;
+			final QueueMessageProcessor requestConsumer = new QueueMessageProcessor() {
+
+				@Override
+				public void process(ExecutorService executorService, String msg, int partition, long msgOffset) {
+					consumedMsgs.add(msg);
+					logger.info("process " + msg + " in thread " + currentThread);
+				}
+			};
+
+			KafkaQueueConsumer cnsmr = new KafkaQueueConsumer(topicName, kafkaConsumerProperties, requestConsumer);
+
+			final String consumerThread = threadPrefix + no;
+			final ThreadStartupResult startUnmanagedThread = threadManager.startUnmanagedThread(consumerThread, cnsmr);
+			startupList.add(startUnmanagedThread);
+		}
+		try {
+			threadManager.waitForThreadsToBeReady(30000, startupList);
+			for (int no = 0; no < PARTITION_COUNT; no++) {
+				for (int msgNo = 0; msgNo < MSG_PER_PARTITION; msgNo++) {
+					sendMsg("Partition " + no + " msg " + msgNo);
+				}
+			}
+
+			int totalWait = 0;
+			boolean done = false;
+			while (totalWait < 30 && !done) {
+				if (consumedMsgs.size() == PARTITION_COUNT * MSG_PER_PARTITION) {
+					done = true;
+				}
+				totalWait++;
+				Thread.sleep(500);
+			}
+
+			Assert.assertEquals(PARTITION_COUNT * MSG_PER_PARTITION, consumedMsgs.size());
+
+			for (int no = 0; no < PARTITION_COUNT; no++) {
+				final ThreadStatus.ThreadState stateOfUnmanagedThread = threadManager.getStateOfUnmanagedThread(threadPrefix + no);
+				Assert.assertEquals(ThreadStatus.ThreadState.RUNNING, stateOfUnmanagedThread);
+			}
+		} catch (TimeoutException e) {
+			Assert.fail("Consumer could not be started on time");
+		}
+	}
+
+	@Test
+	public void testMultiPartitionExceptionAndRetryDuringProcessing() throws InterruptedException, ExecutionException {
+		Properties kafkaConsumerProperties = getConsumerProps();
+		final List<String> consumedMsgs = new ArrayList<String>();
+		List<ThreadStartupResult> startupList = new ArrayList<ThreadStartupResult>();
+
+		final String threadPrefix = "TEST_CONSUMER_RETRY_RUNNING_";
+		for (int no = 0; no < PARTITION_COUNT; no++) {
+			final int currentThread = no;
+			final QueueMessageProcessor requestConsumer = new QueueMessageProcessor() {
+
+				private int excCount = 0;
+
+				@Override
+				public void process(ExecutorService executorService, String msg, int partition, long msgOffset) {
+					if (excCount < KafkaQueueConsumer.MAX_PROCESSING_EXCEPTIONS - 1) {
+						excCount++;
+						logger.info("Throw exception " + excCount + " on consumer " + currentThread);
+						throw new RuntimeException("Forced error on consumer");
+					}
+					consumedMsgs.add(msg);
+					logger.info("process " + msg + " in thread " + currentThread);
+				}
+			};
+
+			KafkaQueueConsumer cnsmr = new KafkaQueueConsumer(topicName, kafkaConsumerProperties, requestConsumer);
+
+			final String consumerThread = threadPrefix + no;
+			final ThreadStartupResult startUnmanagedThread = threadManager.startUnmanagedThread(consumerThread, cnsmr);
+			startupList.add(startUnmanagedThread);
+		}
+		try {
+			threadManager.waitForThreadsToBeReady(30000, startupList);
+			for (int no = 0; no < PARTITION_COUNT; no++) {
+				for (int msgNo = 0; msgNo < MSG_PER_PARTITION; msgNo++) {
+					sendMsg("Partition " + no + " msg " + msgNo);
+				}
+			}
+
+			int totalWait = 0;
+			boolean done = false;
+			while (totalWait < 30 && !done) {
+				if (consumedMsgs.size() == PARTITION_COUNT * MSG_PER_PARTITION) {
+					done = true;
+				}
+				totalWait++;
+				Thread.sleep(500);
+			}
+			Assert.assertEquals(PARTITION_COUNT * MSG_PER_PARTITION, consumedMsgs.size());
+
+			for (int no = 0; no < PARTITION_COUNT; no++) {
+				final ThreadStatus.ThreadState stateOfUnmanagedThread = threadManager.getStateOfUnmanagedThread(threadPrefix + no);
+				Assert.assertEquals(ThreadStatus.ThreadState.RUNNING, stateOfUnmanagedThread);
+			}
+		} catch (TimeoutException e) {
+			Assert.fail("Consumer could not be started on time");
+		}
+	}
+
+	private Properties getConsumerProps() {
+		SettingsManager odfConfig = new ODFFactory().create().getSettingsManager();
+		Properties kafkaConsumerProperties = odfConfig.getKafkaConsumerProperties();
+		final String groupId = "retrying-dummy-consumer";
+		kafkaConsumerProperties.put("group.id", groupId);
+		kafkaConsumerProperties.put("zookeeper.connect", zookeeperHost);
+		final Iterator<String> brokers = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperHost).iterator();
+		StringBuilder brokersString = new StringBuilder();
+		while (brokers.hasNext()) {
+			brokersString.append(brokers.next());
+			if (brokers.hasNext()) {
+				brokersString.append(",");
+			}
+		}
+		kafkaConsumerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersString.toString());
+		kafkaConsumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+		kafkaConsumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+		kafkaConsumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+
+		return kafkaConsumerProperties;
+	}
+
+	void sendMsg(String msg) throws InterruptedException, ExecutionException, TimeoutException {
+		SettingsManager odfConfig = new ODFFactory().create().getSettingsManager();
+		Properties props = odfConfig.getKafkaProducerProperties();
+		final Iterator<String> brokers = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperHost).iterator();
+		StringBuilder brokersString = new StringBuilder();
+		while (brokers.hasNext()) {
+			brokersString.append(brokers.next());
+			if (brokers.hasNext()) {
+				brokersString.append(",");
+			}
+		}
+		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersString.toString());
+		//Should we use a custom partitioner? we could try to involve consumer offsets and always put on "emptiest" partition
+		//props.put("partitioner.class", TestMessagePartitioner.class);
+
+		final KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
+		ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topicName, UUID.randomUUID().toString(), msg);
+		producer.send(producerRecord).get(3000, TimeUnit.MILLISECONDS);
+		producer.close();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceErrorTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceErrorTest.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceErrorTest.java
new file mode 100755
index 0000000..d1c9810
--- /dev/null
+++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceErrorTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+import org.apache.atlas.odf.core.test.controlcenter.ODFAPITest;
+
+public class ParallelServiceErrorTest extends ODFTestcase {
+	private static final int NUMBER_OF_QUEUED_REQUESTS = 1;
+	Logger log = ODFTestLogger.get();
+
+	@Test
+	public void runDataSetsInParallelError() throws Exception {
+		runDataSetsInParallelAndCheckResult(Arrays.asList(new String[] { "successID1", "errorID2" }), AnalysisRequestStatus.State.FINISHED, AnalysisRequestStatus.State.ERROR);
+	}
+
+	private void runDataSetsInParallelAndCheckResult(List<String> dataSetIDs, AnalysisRequestStatus.State... expectedState) throws Exception {
+		log.info("Running data sets in parallel: " + dataSetIDs);
+		log.info("Expected state: " + expectedState);
+		AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+
+		List<AnalysisRequest> requests = new ArrayList<AnalysisRequest>();
+		List<AnalysisResponse> responses = new ArrayList<AnalysisResponse>();
+		List<String> idList = new ArrayList<String>();
+
+		for (int no = 0; no < NUMBER_OF_QUEUED_REQUESTS; no++) {
+			for (String dataSet : dataSetIDs) {
+				final AnalysisRequest req = ODFAPITest.createAnalysisRequest(Arrays.asList(dataSet + UUID.randomUUID().toString()));
+				AnalysisResponse resp = analysisManager.runAnalysis(req);
+				req.setId(resp.getId());
+				requests.add(req);
+				idList.add(resp.getId());
+				responses.add(resp);
+			}
+		}
+		log.info("Parallel requests started: " + idList.toString());
+
+		Assert.assertEquals(NUMBER_OF_QUEUED_REQUESTS * dataSetIDs.size(), requests.size());
+		Assert.assertEquals(NUMBER_OF_QUEUED_REQUESTS * dataSetIDs.size(), responses.size());
+
+		// check that requests are processed in parallel: 
+		//   there must be a point in time where both requests are in status "active"
+		log.info("Polling for status of parallel request...");
+		boolean foundPointInTimeWhereBothRequestsAreActive = false;
+		int maxPolls = ODFAPITest.MAX_NUMBER_OF_POLLS;
+		List<AnalysisRequestStatus.State> allSingleStates = new ArrayList<AnalysisRequestStatus.State>();
+		do {
+			int foundActive = 0;
+			allSingleStates.clear();
+			for (AnalysisRequest request : requests) {
+				final AnalysisRequestStatus.State state = analysisManager.getAnalysisRequestStatus(request.getId()).getState();
+				if (state == AnalysisRequestStatus.State.ACTIVE) {
+					log.info("ACTIVE: " + request.getId() + " foundactive: " + foundActive);
+					foundActive++;
+				} else {
+					log.info("NOT ACTIVE " + request.getId() + " _ " + state);
+				}
+				allSingleStates.add(state);
+			}
+			if (foundActive > 1) {
+				foundPointInTimeWhereBothRequestsAreActive = true;
+			}
+
+			maxPolls--;
+			Thread.sleep(ODFAPITest.WAIT_MS_BETWEEN_POLLING);
+		} while (maxPolls > 0 && Utils.containsNone(allSingleStates, new AnalysisRequestStatus.State[] { AnalysisRequestStatus.State.ACTIVE, AnalysisRequestStatus.State.QUEUED }));
+
+		Assert.assertTrue(maxPolls > 0);
+		Assert.assertTrue(foundPointInTimeWhereBothRequestsAreActive);
+		Assert.assertTrue(allSingleStates.containsAll(Arrays.asList(expectedState)));
+	}
+}