You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/06/28 05:57:20 UTC
[07/25] incubator-atlas git commit: ATLAS-1898: initial commit of ODF
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueManager.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueManager.java b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueManager.java
new file mode 100755
index 0000000..e759ecc
--- /dev/null
+++ b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueManager.java
@@ -0,0 +1,488 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.messaging.kafka;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.atlas.odf.api.OpenDiscoveryFramework;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.engine.KafkaGroupOffsetInfo;
+import org.apache.atlas.odf.api.engine.KafkaStatus;
+import org.apache.atlas.odf.api.engine.KafkaTopicStatus;
+import org.apache.atlas.odf.api.engine.ThreadStatus;
+import org.apache.atlas.odf.api.settings.KafkaMessagingConfiguration;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager;
+import org.apache.atlas.odf.core.notification.NotificationListener;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.engine.MessagingStatus;
+import org.apache.atlas.odf.api.engine.PartitionOffsetInfo;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.controlcenter.AdminMessage;
+import org.apache.atlas.odf.core.controlcenter.AdminQueueProcessor;
+import org.apache.atlas.odf.core.controlcenter.ConfigChangeQueueProcessor;
+import org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore.StatusQueueProcessor;
+import org.apache.atlas.odf.core.controlcenter.DiscoveryServiceStarter;
+import org.apache.atlas.odf.core.controlcenter.ExecutorServiceFactory;
+import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntime;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes;
+import org.apache.atlas.odf.core.controlcenter.StatusQueueEntry;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager.ThreadStartupResult;
+import org.apache.atlas.odf.core.controlcenter.TrackerUtil;
+import org.apache.atlas.odf.core.notification.NotificationManager;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.common.TopicExistsException;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+public class KafkaQueueManager implements DiscoveryServiceQueueManager {
+
+ public static final String TOPIC_NAME_STATUS_QUEUE = "odf-status-topic";
+ public static final String TOPIC_NAME_ADMIN_QUEUE = "odf-admin-topic";
+ public static final String ADMIN_QUEUE_KEY = "odf-admin-queue-key";
+ public static final String SERVICE_TOPIC_PREFIX = "odf-topic-";
+
+ public static final RackAwareMode DEFAULT_RACK_AWARE_MODE = RackAwareMode.Disabled$.MODULE$;
+
+ //use static UUID so that no unnecessary consumer threads are started
+ private final static String UNIQUE_SESSION_THREAD_ID = UUID.randomUUID().toString();
+
+ private final static int THREAD_STARTUP_TIMEOUT_MS = 5000;
+
+ private static List<String> queueConsumerNames = null;
+ private static Object startLock = new Object();
+
+ private final static Logger logger = Logger.getLogger(KafkaQueueManager.class.getName());
+
+ private ThreadManager threadManager;
+ private SettingsManager odfConfig;
+ private String zookeeperConnectString;
+
+ public KafkaQueueManager() {
+ ODFInternalFactory factory = new ODFInternalFactory();
+ threadManager = factory.create(ThreadManager.class);
+ ExecutorServiceFactory esf = factory.create(ExecutorServiceFactory.class);
+ threadManager.setExecutorService(esf.createExecutorService());
+ zookeeperConnectString = factory.create(Environment.class).getZookeeperConnectString();
+ odfConfig = factory.create(SettingsManager.class);
+ }
+
+
+ public Properties getConsumerConfigProperties(String consumerGroupID, boolean consumeFromEnd) {
+ Properties kafkaConsumerProps = odfConfig.getKafkaConsumerProperties();
+ kafkaConsumerProps.put("group.id", consumerGroupID);
+ if (zookeeperConnectString != null) {
+ kafkaConsumerProps.put("zookeeper.connect", zookeeperConnectString);
+ }
+ if (consumeFromEnd) {
+ kafkaConsumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+ } else {
+ kafkaConsumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ }
+ kafkaConsumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ kafkaConsumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
+ kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ return kafkaConsumerProps;
+ }
+
+ private String getBootstrapServers() {
+ final List<String> brokers = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperConnectString);
+ StringBuilder servers = new StringBuilder();
+ final Iterator<String> iterator = brokers.iterator();
+ while(iterator.hasNext()){
+ servers.append(iterator.next());
+ if(iterator.hasNext()){
+ servers.append(",");
+ }
+ }
+ return servers.toString();
+ }
+
+ protected void createTopicIfNotExists(String topicName, int partitionCount, Properties props) {
+ String zkHosts = props.getProperty("zookeeper.connect");
+ ZkClient zkClient = null;
+ try {
+ zkClient = new ZkClient(zkHosts, Integer.valueOf(props.getProperty("zookeeperSessionTimeoutMs")),
+ Integer.valueOf(props.getProperty("zookeeperConnectionTimeoutMs")), ZKStringSerializer$.MODULE$);
+ } catch (ZkTimeoutException zkte) {
+ logger.log(Level.SEVERE, "Could not connect to the Zookeeper instance at ''{0}''. Please ensure that Zookeeper is running", zkHosts);
+ }
+ try {
+ logger.log(Level.FINEST, "Checking if topic ''{0}'' already exists", topicName);
+ // using partition size 1 and replication size 1, no special
+ // per-topic config needed
+ try {
+ final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zkHosts), false);
+ if (!AdminUtils.topicExists(zkUtils, topicName)) {
+ logger.log(Level.INFO, "Topic ''{0}'' does not exist, creating it", topicName);
+
+ //FIXME zkUtils isSecure parameter? Only with SSL! --> parse zkhosts?
+ KafkaMessagingConfiguration kafkaConfig = ((KafkaMessagingConfiguration) odfConfig.getODFSettings().getMessagingConfiguration());
+ AdminUtils.createTopic(zkUtils, topicName, partitionCount, kafkaConfig.getKafkaBrokerTopicReplication(),
+ new Properties(), DEFAULT_RACK_AWARE_MODE);
+ logger.log(Level.FINE, "Topic ''{0}'' created", topicName);
+ //wait before continuing to make sure the topic exists BEFORE consumers are started
+ try {
+ Thread.sleep(1500);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ } catch (TopicExistsException ex) {
+ logger.log(Level.FINE, "Topic ''{0}'' already exists.", topicName);
+ }
+ } finally {
+ if (zkClient != null) {
+ zkClient.close();
+ }
+ }
+ }
+
+
+ private String getTopicName(ServiceRuntime runtime) {
+ return "odf-runtime-" + runtime.getName();
+ }
+
+ private String getConsumerGroup(ServiceRuntime runtime) {
+ return getTopicName(runtime) + "_group";
+ }
+
+ private List<ThreadStartupResult> scheduleAllRuntimeConsumers() {
+ List<ThreadStartupResult> results = new ArrayList<>();
+ for (ServiceRuntime runtime : ServiceRuntimes.getActiveRuntimes()) {
+ results.addAll(scheduleRuntimeConsumers(runtime));
+ }
+ return results;
+ }
+
+ private List<ThreadStartupResult> scheduleRuntimeConsumers(ServiceRuntime runtime) {
+ logger.log(Level.FINER, "Create consumers on queue for runtime ''{0}'' if it doesn't already exist", runtime.getName());
+
+ String topicName = getTopicName(runtime);
+ String consumerGroupId = getConsumerGroup(runtime);
+ Properties kafkaConsumerProps = getConsumerConfigProperties(consumerGroupId, false); // read entries from beginning if consumer was never initialized
+ String threadName = "RuntimeQueueConsumer" + topicName;
+ List<ThreadStartupResult> result = new ArrayList<ThreadStartupResult>();
+ if (threadManager.getStateOfUnmanagedThread(threadName) != ThreadStatus.ThreadState.RUNNING) {
+ createTopicIfNotExists(topicName, 1, kafkaConsumerProps);
+ ThreadStartupResult startupResult = threadManager.startUnmanagedThread(threadName, new KafkaRuntimeConsumer(runtime, topicName, kafkaConsumerProps, new DiscoveryServiceStarter()));
+ result.add(startupResult);
+ } else {
+ result.add(new ThreadStartupResult(threadName) {
+ @Override
+ public boolean isNewThreadCreated() {
+ return false;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+ });
+ }
+ return result;
+ }
+
+
+ private List<ThreadStartupResult> scheduleConsumerThreads(String topicName, int partitionCount, Properties kafkaConsumerProps, String threadName,
+ List<QueueMessageProcessor> processors) {
+ if (processors.size() != partitionCount) {
+ final String msg = "The number of processors must be equal to the partition count in order to support parallel processing";
+ logger.warning(msg);
+ throw new RuntimeException(msg);
+ }
+ createTopicIfNotExists(topicName, partitionCount, kafkaConsumerProps);
+
+ List<ThreadStartupResult> result = new ArrayList<ThreadStartupResult>();
+ for (int no = 0; no < partitionCount; no++) {
+ if (threadManager.getStateOfUnmanagedThread(threadName + "_" + no) != ThreadStatus.ThreadState.RUNNING) {
+ QueueMessageProcessor processor = processors.get(no);
+ ThreadStartupResult created = threadManager.startUnmanagedThread(threadName + "_" + no, new KafkaQueueConsumer(topicName, kafkaConsumerProps, processor));
+ if (created.isNewThreadCreated()) {
+ logger.log(Level.INFO, "Created new consumer thread on topic ''{0}'' with group ID ''{1}'', thread name: ''{2}'', properties: ''{3}''",
+ new Object[] { topicName, kafkaConsumerProps.getProperty("group.id"), threadName + "_" + no, kafkaConsumerProps.toString() });
+ } else {
+ logger.log(Level.FINE, "Consumer thread with thread name: ''{0}'' already exists, doing nothing", new Object[] { threadName + "_" + no });
+ }
+ result.add(created);
+ } else {
+ result.add(new ThreadStartupResult(threadName) {
+ @Override
+ public boolean isNewThreadCreated() {
+ return false;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+ });
+ }
+ }
+ return result;
+ }
+
+ private ThreadStartupResult scheduleConsumerThread(String topicName, Properties kafkaConsumerProps, String threadName, QueueMessageProcessor processor) {
+ return scheduleConsumerThreads(topicName, 1, kafkaConsumerProps, threadName, Arrays.asList(processor)).get(0);
+ }
+
+ @Override
+ public void enqueue(AnalysisRequestTracker tracker) {
+ DiscoveryServiceRequest dsRequest = TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker);
+ if (dsRequest == null) {
+ throw new RuntimeException("Tracker is finished, should not be enqueued");
+ }
+ String dsID = dsRequest.getDiscoveryServiceId();
+ dsRequest.setPutOnRequestQueue(System.currentTimeMillis());
+ ServiceRuntime runtime = ServiceRuntimes.getRuntimeForDiscoveryService(dsID);
+ if (runtime == null) {
+ throw new RuntimeException(MessageFormat.format("Service runtime for service ''{0}'' was not found.", dsID));
+ }
+ enqueueJSONMessage(getTopicName(runtime), tracker, tracker.getRequest().getId());
+ }
+
+ private void enqueueJSONMessage(String topicName, Object jsonObject, String key) {
+ String value = null;
+ try {
+ value = JSONUtils.toJSON(jsonObject);
+ } catch (JSONException e) {
+ throw new RuntimeException(e);
+ }
+ new ODFInternalFactory().create(KafkaProducerManager.class).sendMsg(topicName, key, value);
+ }
+
+ List<ThreadStartupResult> scheduleStatusQueueConsumers() {
+ logger.log(Level.FINER, "Create consumers on status queue if they don't already exist");
+ List<ThreadStartupResult> results = new ArrayList<ThreadStartupResult>();
+
+ // create consumer thread for the status watcher of all trackes
+ String statusWatcherConsumerGroupID = "DSStatusWatcherConsumerGroup" + UNIQUE_SESSION_THREAD_ID; // have a new group id on each node that reads all from the beginning
+ // always read from beginning for the status queue
+ Properties statusWatcherKafkaConsumerProps = getConsumerConfigProperties(statusWatcherConsumerGroupID, false);
+ final String statusWatcherThreadName = "StatusWatcher" + TOPIC_NAME_STATUS_QUEUE; // a fixed name
+ String threadNameWithPartition = statusWatcherThreadName + "_0";
+ final ThreadStatus.ThreadState stateOfUnmanagedThread = threadManager.getStateOfUnmanagedThread(threadNameWithPartition);
+ logger.fine("State of status watcher thread: " + stateOfUnmanagedThread);
+ if (stateOfUnmanagedThread != ThreadStatus.ThreadState.RUNNING) {
+ final ThreadStartupResult scheduleConsumerThread = scheduleConsumerThread(TOPIC_NAME_STATUS_QUEUE, statusWatcherKafkaConsumerProps, statusWatcherThreadName,
+ new StatusQueueProcessor());
+ results.add(scheduleConsumerThread);
+ } else {
+ results.add(new ThreadStartupResult(statusWatcherThreadName) {
+ @Override
+ public boolean isNewThreadCreated() {
+ return false;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+ });
+ }
+
+ return results;
+ }
+
+
+ @Override
+ public void enqueueInStatusQueue(StatusQueueEntry sqe) {
+ enqueueJSONMessage(TOPIC_NAME_STATUS_QUEUE, sqe, StatusQueueEntry.getRequestId(sqe));
+ }
+
+
+ private List<ThreadStartupResult> scheduleAdminQueueConsumers() {
+ List<ThreadStartupResult> results = new ArrayList<ThreadStartupResult>();
+ //schedule admin queue consumers
+ // consumer group so that every node receives events
+ String adminWatcherConsumerGroupID = "DSAdminQueueConsumerGroup" + UNIQUE_SESSION_THREAD_ID; // have a new group id on each node
+ Properties adminWatcherKafkaConsumerProps = getConsumerConfigProperties(adminWatcherConsumerGroupID, true);
+ final String adminWatcherThreadName = "AdminWatcher" + TOPIC_NAME_ADMIN_QUEUE;
+ String threadNameWithPartition = adminWatcherThreadName + "_0";
+ if (threadManager.getStateOfUnmanagedThread(threadNameWithPartition) != ThreadStatus.ThreadState.RUNNING) {
+ results.add(scheduleConsumerThread(TOPIC_NAME_ADMIN_QUEUE, adminWatcherKafkaConsumerProps, adminWatcherThreadName, new AdminQueueProcessor()));
+ // consumer group so only one node receives events
+ String distributedAdminConsumerGroup = "DSAdminQueueConsumerGroupCommon";
+ Properties kafkaProps = getConsumerConfigProperties(distributedAdminConsumerGroup, true);
+ final String threadName = "DistributedAdminWatcher";
+ results.add(scheduleConsumerThread(TOPIC_NAME_ADMIN_QUEUE, kafkaProps, threadName, new ConfigChangeQueueProcessor()));
+ } else {
+ results.add(new ThreadStartupResult(adminWatcherThreadName) {
+ @Override
+ public boolean isNewThreadCreated() {
+ return false;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+ });
+ }
+ return results;
+ }
+
+ @Override
+ public void enqueueInAdminQueue(AdminMessage message) {
+ enqueueJSONMessage(TOPIC_NAME_ADMIN_QUEUE, message, ADMIN_QUEUE_KEY);
+ }
+
+ @Override
+ public void start() throws TimeoutException {
+ synchronized (startLock) {
+ if (queueConsumerNames == null) {
+ List<ThreadStartupResult> results = new ArrayList<>();
+ results.addAll(scheduleStatusQueueConsumers());
+ results.addAll(scheduleAdminQueueConsumers());
+ results.addAll(scheduleAllRuntimeConsumers());
+ results.addAll(scheduleNotificationListenerThreads());
+ List<String> consumerNames = new ArrayList<>();
+ for (ThreadStartupResult tsr : results) {
+ consumerNames.add(tsr.getThreadId());
+ }
+ queueConsumerNames = consumerNames;
+ this.threadManager.waitForThreadsToBeReady(THREAD_STARTUP_TIMEOUT_MS * results.size(), results);
+ logger.info("KafkaQueueManager successfully initialized");
+ }
+ }
+ }
+
+ public void stop() {
+ synchronized (startLock) {
+ if (queueConsumerNames != null) {
+ threadManager.shutdownThreads(queueConsumerNames);
+ queueConsumerNames = null;
+ }
+ }
+ }
+
+ @Override
+ public MessagingStatus getMessagingStatus() {
+ KafkaStatus status = new KafkaStatus();
+ KafkaMonitor monitor = new ODFInternalFactory().create(KafkaMonitor.class);
+ status.setBrokers(monitor.getBrokers(zookeeperConnectString));
+
+ List<String> topics = new ArrayList<String>(Arrays.asList(KafkaQueueManager.TOPIC_NAME_ADMIN_QUEUE, KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE));
+ for (DiscoveryServiceProperties info : new ODFFactory().create().getDiscoveryServiceManager().getDiscoveryServicesProperties()) {
+ topics.add(KafkaQueueManager.SERVICE_TOPIC_PREFIX + info.getId());
+ }
+
+ List<KafkaTopicStatus> topicStatusList = new ArrayList<KafkaTopicStatus>();
+ for (String topic : topics) {
+ KafkaTopicStatus topicStatus = getTopicStatus(topic, monitor);
+ topicStatusList.add(topicStatus);
+ }
+ status.setTopicStatus(topicStatusList);
+ return status;
+ }
+
+ private KafkaTopicStatus getTopicStatus(String topic, KafkaMonitor monitor) {
+ KafkaTopicStatus topicStatus = new KafkaTopicStatus();
+ topicStatus.setTopic(topic);
+ topicStatus.setBrokerPartitionMessageInfo(monitor.getMessageCountForTopic(zookeeperConnectString, topic));
+
+ List<KafkaGroupOffsetInfo> offsetInfoList = new ArrayList<KafkaGroupOffsetInfo>();
+ List<String> consumerGroupsFromZookeeper = monitor.getConsumerGroups(zookeeperConnectString, topic);
+ for (String group : consumerGroupsFromZookeeper) {
+ KafkaGroupOffsetInfo offsetInfoContainer = new KafkaGroupOffsetInfo();
+ offsetInfoContainer.setGroupId(group);
+ List<PartitionOffsetInfo> offsetsForTopic = monitor.getOffsetsForTopic(zookeeperConnectString, group, topic);
+ for (PartitionOffsetInfo info : offsetsForTopic) {
+ // to reduce clutter, only if at least 1 partition has an offset > -1 (== any offset) for this consumer group,
+ // it will be included in the result
+ if (info.getOffset() > -1) {
+ offsetInfoContainer.setOffsets(offsetsForTopic);
+ offsetInfoList.add(offsetInfoContainer);
+ break;
+ }
+ }
+ }
+ topicStatus.setConsumerGroupOffsetInfo(offsetInfoList);
+
+ topicStatus.setPartitionBrokersInfo(monitor.getPartitionInfoForTopic(zookeeperConnectString, topic));
+ return topicStatus;
+ }
+
+ private List<ThreadStartupResult> scheduleNotificationListenerThreads() {
+ NotificationManager nm = new ODFInternalFactory().create(NotificationManager.class);
+ List<NotificationListener> listeners = nm.getListeners();
+ List<ThreadStartupResult> result = new ArrayList<>();
+ if (listeners == null) {
+ return result;
+ }
+ final OpenDiscoveryFramework odf = new ODFFactory().create();
+ for (final NotificationListener listener : listeners) {
+ String topicName = listener.getTopicName();
+ String consumerGroupId = "ODFNotificationGroup" + topicName;
+ Properties kafkaConsumerProps = getConsumerConfigProperties(consumerGroupId, true);
+ String threadName = "NotificationListenerThread" + topicName;
+ if (threadManager.getStateOfUnmanagedThread(threadName) != ThreadStatus.ThreadState.RUNNING) {
+ KafkaQueueConsumer consumer = new KafkaQueueConsumer(topicName, kafkaConsumerProps, new QueueMessageProcessor() {
+
+ @Override
+ public void process(ExecutorService executorService, String msg, int partition, long msgOffset) {
+ try {
+ listener.onEvent(msg, odf);
+ } catch(Exception exc) {
+ String errorMsg = MessageFormat.format("Notification listsner ''{0}'' has thrown an exception. Ignoring it", listener.getName());
+ logger.log(Level.WARNING, errorMsg, exc);
+ }
+ }
+ });
+ ThreadStartupResult startupResult = threadManager.startUnmanagedThread(threadName, consumer);
+ result.add(startupResult);
+ } else {
+ result.add(new ThreadStartupResult(threadName) {
+ @Override
+ public boolean isNewThreadCreated() {
+ return false;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+ });
+ }
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaRuntimeConsumer.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaRuntimeConsumer.java b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaRuntimeConsumer.java
new file mode 100755
index 0000000..73d98e7
--- /dev/null
+++ b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaRuntimeConsumer.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.messaging.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.controlcenter.ODFRunnable;
+import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntime;
+
+/**
+ * This consumer is started for a certain runtime and starts a KafkaQueueConsumer if
+ * the runtime is available.
+ *
+ *
+ */
+public class KafkaRuntimeConsumer implements ODFRunnable {
+
+ Logger logger = Logger.getLogger(KafkaRuntimeConsumer.class.getName());
+
+ private ServiceRuntime runtime;
+ private boolean isShutdown = false;
+ private ExecutorService executorService = null;
+ private KafkaQueueConsumer kafkaQueueConsumer = null;
+
+ private String topic;
+ private Properties config;
+ private QueueMessageProcessor processor;
+
+ private KafkaQueueConsumer.ConsumptionCallback callback = new KafkaQueueConsumer.ConsumptionCallback() {
+ @Override
+ public boolean stopConsumption() {
+ return isShutdown || (runtime.getWaitTimeUntilAvailable() > 0);
+ }
+ };
+
+ public KafkaRuntimeConsumer(ServiceRuntime runtime, String topicName, Properties config, QueueMessageProcessor processor) {
+ this.runtime = runtime;
+ this.processor = processor;
+ this.topic = topicName;
+ this.config = config;
+ }
+
+ @Override
+ public void run() {
+ logger.log(Level.INFO, "Starting runtime consumer for topic ''{0}''", topic);
+ while (!isShutdown) {
+ long waitTime = runtime.getWaitTimeUntilAvailable();
+ if (waitTime <= 0) {
+ logger.log(Level.INFO, "Starting Kafka consumer for topic ''{0}''", topic);
+ kafkaQueueConsumer = new KafkaQueueConsumer(topic, config, processor, callback);
+ kafkaQueueConsumer.setExecutorService(executorService);
+ // run consumer synchronously
+ kafkaQueueConsumer.run();
+ logger.log(Level.INFO, "Kafka consumer for topic ''{0}'' is finished", topic);
+
+ // if we are here, this means that the consumer was cancelled
+ // either directly or (more likely) through the Consumption callback
+ kafkaQueueConsumer = null;
+ } else {
+ try {
+ logger.log(Level.FINER, "Runtime ''{0}'' is not available, waiting for ''{1}''ms", new Object[]{runtime.getName(), waitTime });
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ logger.log(Level.INFO, "Kafka runtime consumer for topic ''{0}'' has shut down", topic);
+ }
+
+ @Override
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
+ @Override
+ public void cancel() {
+ isShutdown = true;
+ if (kafkaQueueConsumer != null) {
+ kafkaQueueConsumer.cancel();
+ }
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/MessageSearchConsumer.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/MessageSearchConsumer.java b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/MessageSearchConsumer.java
new file mode 100755
index 0000000..9c08f3a
--- /dev/null
+++ b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/MessageSearchConsumer.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.messaging.kafka;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.controlcenter.ODFRunnable;
+import org.apache.atlas.odf.api.engine.PartitionOffsetInfo;
+
+public class MessageSearchConsumer implements ODFRunnable {
+ private static final long POLLING_DURATION_MS = 100;
+ private static final int MAX_POLL_COUNT = 5;
+
+ private Logger logger = Logger.getLogger(MessageSearchConsumer.class.getName());
+ private SearchCompletedCallback searchCompletedCallback;
+ private List<String> searchStrings;
+ protected String topic;
+ private KafkaConsumer<String, String> kafkaConsumer;
+ private boolean shutdown;
+ private boolean ready = false;
+ private List<PartitionOffsetInfo> maxOffsetsForTopic = new ArrayList<PartitionOffsetInfo>();
+
+
+ public MessageSearchConsumer(String topic, SearchCompletedCallback completitionCallback, List<String> searchStrings) {
+ setTopic(topic);
+ setSearchStrings(searchStrings);
+ setCompletitionCallback(completitionCallback);
+ }
+
+ public MessageSearchConsumer() {
+ }
+
+ protected List<PartitionOffsetInfo> retrieveTopicOffsets() {
+ List<PartitionOffsetInfo> offsetsForTopic = new ArrayList<PartitionOffsetInfo>();
+ String zookeeperConnect = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
+
+ if (zookeeperConnect != null) {
+ final KafkaMonitor create = new ODFInternalFactory().create(KafkaMonitor.class);
+ for (int part : create.getPartitionsForTopic(zookeeperConnect, this.topic)) {
+ offsetsForTopic.add(create.getOffsetsOfLastMessagesForTopic(zookeeperConnect, this.topic, part));
+ }
+ }
+ return offsetsForTopic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public void setSearchStrings(List<String> searchStrings) {
+ this.searchStrings = searchStrings;
+ }
+
+ public void setCompletitionCallback(SearchCompletedCallback completitionCallback) {
+ this.searchCompletedCallback = completitionCallback;
+ }
+
+ protected Properties getKafkaConsumerProperties() {
+ Properties consumerProperties = new ODFFactory().create().getSettingsManager().getKafkaConsumerProperties();
+ consumerProperties.put("group.id", UUID.randomUUID().toString() + "_searchConsumer");
+ final String zookeeperConnect = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
+ consumerProperties.put("zookeeper.connect", zookeeperConnect);
+ consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ final Iterator<String> brokers = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperConnect).iterator();
+ StringBuilder brokersString = new StringBuilder();
+ while (brokers.hasNext()) {
+ brokersString.append(brokers.next());
+ if (brokers.hasNext()) {
+ brokersString.append(",");
+ }
+ }
+ consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersString.toString());
+ consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ return consumerProperties;
+ }
+
+ @Override
+ public void run() {
+ this.maxOffsetsForTopic = retrieveTopicOffsets();
+ final String logPrefix = "Consumer for topic " + topic + ": ";
+ try {
+
+ Map<Integer, Boolean> maxOffsetReachedMap = new HashMap<Integer, Boolean>();
+ if (maxOffsetsForTopic.isEmpty()) {
+ logger.info("No offsets found for topic " + this.topic + ", therefore no matching messages can be found");
+ if (searchCompletedCallback != null) {
+ searchCompletedCallback.onDoneSearching(new HashMap<String, PartitionOffsetInfo>());
+ return;
+ }
+ }
+ for (PartitionOffsetInfo info : maxOffsetsForTopic) {
+ //if the max offset is -1, no message exists on the partition
+ if (info.getOffset() > -1) {
+ maxOffsetReachedMap.put(info.getPartitionId(), false);
+ }
+ }
+
+ Map<String, PartitionOffsetInfo> resultMap = new HashMap<String, PartitionOffsetInfo>();
+
+ Properties consumerProperties = getKafkaConsumerProperties();
+
+ if (this.kafkaConsumer == null) {
+ logger.fine(logPrefix + " create new consumer for topic " + topic);
+ try {
+ this.kafkaConsumer = new KafkaConsumer<String, String>(consumerProperties);
+ //In order to prevent other consumers from getting assigned this partition during a rebalance, the partition(s) MUST be assigned manually (not using auto assign because of subscribe())
+ kafkaConsumer.subscribe(Arrays.asList(topic));
+ } catch (ZkTimeoutException zkte) {
+ String zkHosts = consumerProperties.getProperty("zookeeper.connect");
+ logger.log(Level.SEVERE, logPrefix + " Could not connect to the Zookeeper instance at ''{0}''. Please ensure that Zookeeper is running", zkHosts);
+ throw zkte;
+ }
+ }
+ logger.log(Level.INFO, logPrefix + " Consumer " + "''{1}'' is now listening on ODF queue ''{0}'' with configuration {2}",
+ new Object[] { topic, kafkaConsumer, consumerProperties });
+
+ int pollCount = 0;
+ while (!Thread.interrupted() && pollCount < MAX_POLL_COUNT && !shutdown && kafkaConsumer != null) {
+ logger.info("searching ...");
+ pollCount++;
+ ConsumerRecords<String, String> records = kafkaConsumer.poll(POLLING_DURATION_MS);
+ ready = true;
+ final Iterator<ConsumerRecord<String, String>> polledRecords = records.records(topic).iterator();
+
+ while (polledRecords.hasNext() && !shutdown) {
+ final ConsumerRecord<String, String> next = polledRecords.next();
+ for (String s : searchStrings) {
+ if ((next.key() != null && next.key().equals(s)) || (next.value() != null && next.value().contains(s))) {
+ final PartitionOffsetInfo position = new PartitionOffsetInfo();
+ position.setOffset(next.offset());
+ position.setPartitionId(next.partition());
+ resultMap.put(s, position);
+ }
+ }
+
+ if (next.offset() == maxOffsetsForTopic.get(next.partition()).getOffset()) {
+ maxOffsetReachedMap.put(next.partition(), true);
+ }
+
+ boolean allCompleted = true;
+ for (Entry<Integer, Boolean> entry : maxOffsetReachedMap.entrySet()) {
+ if (!entry.getValue()) {
+ allCompleted = false;
+ break;
+ }
+ }
+
+ if (allCompleted) {
+ logger.info("Done searching all messages");
+ if (searchCompletedCallback != null) {
+ searchCompletedCallback.onDoneSearching(resultMap);
+ return;
+ }
+ shutdown = true;
+ }
+ }
+ }
+ } catch (Exception exc) {
+ String msg = MessageFormat.format(" Caught exception on queue ''{0}''", topic);
+ logger.log(Level.WARNING, logPrefix + msg, exc);
+ } finally {
+ if (kafkaConsumer != null) {
+ logger.log(Level.FINE, logPrefix + "Closing consumer " + " on topic ''{0}''", topic);
+ kafkaConsumer.close();
+ logger.log(Level.FINE, logPrefix + "Closed consumer " + " on topic ''{0}''", topic);
+ kafkaConsumer = null;
+ }
+ }
+ logger.log(Level.FINE, logPrefix + "Finished consumer on topic ''{0}''", topic);
+ }
+
+ @Override
+ public void setExecutorService(ExecutorService service) {
+
+ }
+
+ @Override
+ public void cancel() {
+ this.shutdown = true;
+ }
+
+ @Override
+ public boolean isReady() {
+ return ready;
+ }
+
+ public interface SearchCompletedCallback {
+ void onDoneSearching(Map<String, PartitionOffsetInfo> msgPositionMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/main/resources/org/apache/atlas/odf/odf-implementation.properties b/odf/odf-messaging/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
new file mode 100755
index 0000000..95c1f71
--- /dev/null
+++ b/odf/odf-messaging/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
@@ -0,0 +1,14 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+DiscoveryServiceQueueManager=org.apache.atlas.odf.core.messaging.kafka.KafkaQueueManager
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueConsumerExceptionTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueConsumerExceptionTest.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueConsumerExceptionTest.java
new file mode 100755
index 0000000..396193f
--- /dev/null
+++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueConsumerExceptionTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.engine.ThreadStatus.ThreadState;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaMonitor;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueManager;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+
+import kafka.admin.AdminUtils;
+import kafka.common.TopicExistsException;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+public class KafkaQueueConsumerExceptionTest extends ODFTestcase {
+ static Logger logger = ODFTestLogger.get();
+ static final String topicName = "my_dummy_test_topic";
+ static String zookeeperHost = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
+
+ @BeforeClass
+ public static void setupTopic() {
+ ZkClient zkClient = null;
+ try {
+ zkClient = new ZkClient(zookeeperHost, 5000, 5000, ZKStringSerializer$.MODULE$);
+ logger.log(Level.FINEST, "Checking if topic ''{0}'' already exists", topicName);
+ // using partition size 1 and replication size 1, no special
+ // per-topic config needed
+ logger.log(Level.FINE, "Topic ''{0}'' does not exist, creating it", topicName);
+ //FIXME zkUtils isSecure parameter? Only with SSL! --> parse zkhosts?
+ AdminUtils.createTopic(new ZkUtils(zkClient, new ZkConnection(zookeeperHost), false), topicName, 1, 1, new Properties(), KafkaQueueManager.DEFAULT_RACK_AWARE_MODE);
+ logger.log(Level.FINE, "Topic ''{0}'' created", topicName);
+ } catch (TopicExistsException ex) {
+ logger.log(Level.FINE, "Topic ''{0}'' already exists.", topicName);
+ } catch (ZkTimeoutException zkte) {
+ logger.log(Level.SEVERE, "Could not connect to the Zookeeper instance at ''{0}''. Please ensure that Zookeeper is running", zookeeperHost);
+ } finally {
+ if (zkClient != null) {
+ zkClient.close();
+ }
+ }
+ }
+
+ @Test
+ public void testExceptionAndRetryDuringProcessing() throws InterruptedException, ExecutionException, TimeoutException {
+ final ODFInternalFactory odfFactory = new ODFInternalFactory();
+ final String groupId = "retrying-exception-dummy-consumer";
+ Properties kafkaConsumerProperties = new KafkaQueueManager().getConsumerConfigProperties(groupId, true);
+ kafkaConsumerProperties.put("group.id", groupId);
+ final List<String> consumedMsgs1 = new ArrayList<String>();
+ KafkaQueueConsumer cnsmr = new KafkaQueueConsumer(topicName, kafkaConsumerProperties, new QueueMessageProcessor() {
+
+ @Override
+ public void process(ExecutorService executorService, String msg, int partition, long offset) {
+ consumedMsgs1.add(msg);
+ logger.info("retry_consumer process " + msg + " throw exception and try again");
+ throw new RuntimeException("Oops!");
+ }
+ });
+
+ final ThreadManager threadManager = odfFactory.create(ThreadManager.class);
+ final String consumerThread = "TEST_CONSUMER_RETRY_RUNNING";
+ threadManager.waitForThreadsToBeReady(10000, Arrays.asList(threadManager.startUnmanagedThread(consumerThread, cnsmr)));
+
+ sendMsg("TEST_MSG");
+ sendMsg("TEST_MSG2");
+
+ Thread.sleep(2000);
+
+ Assert.assertEquals(2 * KafkaQueueConsumer.MAX_PROCESSING_EXCEPTIONS, consumedMsgs1.size());
+
+ final ThreadState stateOfUnmanagedThread = threadManager.getStateOfUnmanagedThread(consumerThread);
+ Assert.assertEquals(ThreadState.RUNNING, stateOfUnmanagedThread);
+ }
+
+ void sendMsg(String msg) throws InterruptedException, ExecutionException, TimeoutException {
+ SettingsManager odfConfig = new ODFFactory().create().getSettingsManager();
+
+ Properties props = odfConfig.getKafkaProducerProperties();
+ final Iterator<String> brokers = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperHost).iterator();
+ StringBuilder brokersString = new StringBuilder();
+ while (brokers.hasNext()) {
+ brokersString.append(brokers.next());
+ if (brokers.hasNext()) {
+ brokersString.append(",");
+ }
+ }
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersString.toString());
+
+ final KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
+ ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topicName, UUID.randomUUID().toString(), msg);
+ producer.send(producerRecord).get(3000, TimeUnit.MILLISECONDS);
+ producer.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueManagerTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueManagerTest.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueManagerTest.java
new file mode 100755
index 0000000..cff538c
--- /dev/null
+++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueManagerTest.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.settings.MessagingConfiguration;
+import org.apache.atlas.odf.api.settings.ODFSettings;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueConsumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.wink.json4j.JSONException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.engine.ThreadStatus.ThreadState;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore;
+import org.apache.atlas.odf.core.controlcenter.DefaultThreadManager;
+import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
+import org.apache.atlas.odf.core.controlcenter.StatusQueueEntry;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager.ThreadStartupResult;
+import org.apache.atlas.odf.core.controlcenter.TrackerUtil;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaMonitor;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaProducerManager;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueManager;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class KafkaQueueManagerTest extends ODFTestBase {
+
+ private static Long origRetention;
+ Logger logger = ODFTestLogger.get();
+ String zookeeperConnectString = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
+
+ @BeforeClass
+ public static void setupTrackerRetention() throws ValidationException {
+ SettingsManager settingsManager = new ODFFactory().create().getSettingsManager();
+ //SETUP RETENTION TO KEEP TRACKERS!!!
+ final MessagingConfiguration messagingConfiguration = settingsManager.getODFSettings().getMessagingConfiguration();
+ origRetention = messagingConfiguration.getAnalysisRequestRetentionMs();
+ messagingConfiguration.setAnalysisRequestRetentionMs(120000000l);
+
+ ODFTestLogger.get().info("Set request retention to " + settingsManager.getODFSettings().getMessagingConfiguration().getAnalysisRequestRetentionMs());
+ }
+
+ @AfterClass
+ public static void cleanupTrackerRetention() throws ValidationException {
+ SettingsManager settingsManager = new ODFFactory().create().getSettingsManager();
+ ODFSettings settings = settingsManager.getODFSettings();
+ settings.getMessagingConfiguration().setAnalysisRequestRetentionMs(origRetention);
+ settingsManager.updateODFSettings(settings);
+ }
+
+ @Test
+ public void testStatusQueue() throws Exception {
+ KafkaQueueManager kqm = new KafkaQueueManager();
+
+ logger.info("Queue manager created");
+ AnalysisRequestTracker tracker = JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, "org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json", null);
+
+ long before = System.currentTimeMillis();
+ tracker.setLastModified(before);
+ int maxEntries = 10;
+ for (int i = 0; i < maxEntries; i++) {
+ tracker.getRequest().setId("id" + i);
+ StatusQueueEntry sqe = new StatusQueueEntry();
+ sqe.setAnalysisRequestTracker(tracker);
+ kqm.enqueueInStatusQueue(sqe);
+
+ // System.out.println("tracker "+i+" enqueued in status queue");
+ }
+ long after = System.currentTimeMillis();
+ logger.info("Time for enqueueing " + maxEntries + " objects: " + (after - before) + ", " + ((after - before) / maxEntries) + "ms per object");
+ Thread.sleep(100 * maxEntries);
+
+ AnalysisRequestTrackerStore store = new DefaultStatusQueueStore();
+
+ for (int i = 0; i < maxEntries; i++) {
+ logger.info("Querying status " + i);
+ AnalysisRequestTracker queriedTracker = store.query("id" + i);
+ Assert.assertNotNull(queriedTracker);
+ Assert.assertEquals(STATUS.FINISHED, queriedTracker.getStatus());
+ }
+
+ // Thread.sleep(5000);
+ // Assert.fail("you fail");
+ logger.info("Test testEnqueueStatusQueue finished");
+ }
+
+ /**
+ * This test creates a tracker, puts it on the status queue, kills the service consumer and creates a new dummy consumer to put the offset of the service consumer behind the new tracker
+ * Then the status consumer is shut down and its offset is reset in order to make it consume from the start again and thereby cleaning up stuck processes
+ * Then kafka queue manager is re-initialized, causing all consumers to come up and triggering the cleanup process
+ */
+ @Test
+ @Ignore("Adjust once ServiceRuntimes are fully implemented")
+ public void testStuckRequestCleanup() throws JSONException, InterruptedException, ExecutionException, TimeoutException {
+ final AnalysisRequestTracker tracker = JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, "org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json",
+ null);
+ tracker.setStatus(STATUS.IN_DISCOVERY_SERVICE_QUEUE);
+ tracker.setNextDiscoveryServiceRequest(0);
+ tracker.setLastModified(System.currentTimeMillis());
+ final String newTrackerId = "KAFKA_QUEUE_MANAGER_09_TEST" + UUID.randomUUID().toString();
+ tracker.getRequest().setId(newTrackerId);
+ DiscoveryServiceRequest dsRequest = TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker);
+ final DiscoveryServiceProperties discoveryServiceRegistrationInfo = new ODFFactory().create().getDiscoveryServiceManager().getDiscoveryServicesProperties()
+ .get(0);
+ dsRequest.setDiscoveryServiceId(discoveryServiceRegistrationInfo.getId());
+ String dsID = dsRequest.getDiscoveryServiceId();
+ String topicName = KafkaQueueManager.SERVICE_TOPIC_PREFIX + dsID;
+ //Add tracker to queue, set offset behind request so that it should be cleanup
+
+ String consumerGroupId = "odf-topic-" + dsID + "_group";
+ String threadName = "Dummy_DiscoveryServiceQueueConsumer" + topicName;
+
+ final List<Throwable> multiThreadErrors = new ArrayList<Throwable>();
+ final DefaultThreadManager tm = new DefaultThreadManager();
+ logger.info("shutdown old test 09 consumer and replace with fake doing nothing");
+ for (int no = 0; no < discoveryServiceRegistrationInfo.getParallelismCount(); no++) {
+ tm.shutdownThreads(Collections.singletonList("DiscoveryServiceQueueConsumer" + topicName + "_" + no));
+ }
+ Properties kafkaConsumerProps = getKafkaConsumerConfigProperties(consumerGroupId);
+
+ final long[] producedMsgOffset = new long[1];
+
+ final CountDownLatch msgProcessingLatch = new CountDownLatch(1);
+ ThreadStartupResult created = tm.startUnmanagedThread(threadName, new KafkaQueueConsumer(topicName, kafkaConsumerProps, new QueueMessageProcessor() {
+
+ @Override
+ public void process(ExecutorService executorService, String msg, int partition, long msgOffset) {
+ logger.info("Dequeue without processing " + msgOffset);
+ if (msgOffset == producedMsgOffset[0]) {
+ try {
+ msgProcessingLatch.countDown();
+ } catch (Exception e) {
+ msgProcessingLatch.countDown();
+ multiThreadErrors.add(e);
+ }
+ }
+ }
+
+ }));
+
+ tm.waitForThreadsToBeReady(30000, Arrays.asList(created));
+
+ String key = tracker.getRequest().getId();
+ String value = JSONUtils.toJSON(tracker);
+
+ new DefaultStatusQueueStore().store(tracker);
+
+ KafkaMonitor kafkaMonitor = new ODFInternalFactory().create(KafkaMonitor.class);
+ List<String> origQueueConsumers = kafkaMonitor.getConsumerGroups(zookeeperConnectString, KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE);
+ logger.info("Found status consumers: " + origQueueConsumers.toString() + ", shutting down StatusWatcher");
+
+ //kill status queue watcher so that it is restarted when queue manager is initialized and detects stuck requests
+ tm.shutdownThreads(Collections.singletonList("StatusWatcher" + KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE + "_0"));
+
+ int maxWaitForConsumerDeath = 60;
+ while (tm.getStateOfUnmanagedThread("StatusWatcher" + KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE + "_0") != ThreadState.NON_EXISTENT
+ || tm.getStateOfUnmanagedThread("StatusWatcher" + KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE + "_0") != ThreadState.FINISHED && maxWaitForConsumerDeath > 0) {
+ maxWaitForConsumerDeath--;
+ Thread.sleep(500);
+ }
+
+ logger.info("Only 1 consumer left? " + maxWaitForConsumerDeath + " : " + tm.getStateOfUnmanagedThread("StatusWatcher" + KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE + "_0"));
+ logger.info(" set offset for status consumer to beginning so that it consumes from when restarting");
+ final int offset = 1000000;
+ for (String statusConsumerGroup : origQueueConsumers) {
+ if (statusConsumerGroup.contains("DSStatusWatcherConsumerGroup")) {
+ boolean success = false;
+ int retryCount = 0;
+ final int maxOffsetRetry = 20;
+ while (!success && retryCount < maxOffsetRetry) {
+ success = kafkaMonitor.setOffset(zookeeperConnectString, statusConsumerGroup, KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE, 0, offset);
+ retryCount++;
+ Thread.sleep(500);
+ }
+
+ Assert.assertNotEquals(retryCount, maxOffsetRetry);
+ Assert.assertTrue(success);
+ }
+ }
+
+ new ODFInternalFactory().create(KafkaProducerManager.class).sendMsg(topicName, key, value, new Callback() {
+
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ producedMsgOffset[0] = metadata.offset();
+ }
+ });
+
+ final boolean await = msgProcessingLatch.await(240, TimeUnit.SECONDS);
+ Assert.assertTrue(await);
+ if (await) {
+ logger.info("run after message consumption...");
+ AnalysisRequestTrackerStore store = new ODFInternalFactory().create(AnalysisRequestTrackerStore.class);
+ AnalysisRequestTracker storeTracker = store.query(tracker.getRequest().getId());
+ Assert.assertEquals(tracker.getRequest().getId(), storeTracker.getRequest().getId());
+ Assert.assertEquals(STATUS.IN_DISCOVERY_SERVICE_QUEUE, storeTracker.getStatus());
+
+ //start odf and cleanup here...
+ logger.info("shutdown all threads and restart ODF");
+ tm.shutdownAllUnmanagedThreads();
+
+ int threadKillRetry = 0;
+ while (tm.getNumberOfRunningThreads() > 0 && threadKillRetry < 20) {
+ Thread.sleep(500);
+ threadKillRetry++;
+ }
+
+ logger.info("All threads down, restart ODF " + threadKillRetry);
+
+ // Initialize analysis manager
+ new ODFFactory().create().getAnalysisManager();
+
+ kafkaMonitor = new ODFInternalFactory().create(KafkaMonitor.class);
+ origQueueConsumers = kafkaMonitor.getConsumerGroups(zookeeperConnectString, KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE);
+ int healthRetrieveRetry = 0;
+ //wait for max of 40 secs for status consumer to come up. If it is, we can continue because ODF is restarted successfully
+ while (origQueueConsumers.isEmpty() && healthRetrieveRetry < 240) {
+ healthRetrieveRetry++;
+ Thread.sleep(500);
+ origQueueConsumers = kafkaMonitor.getConsumerGroups(zookeeperConnectString, KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE);
+ }
+ Assert.assertNotEquals(healthRetrieveRetry, 240);
+
+ logger.info("initialized, wait for cleanup ... " + healthRetrieveRetry);
+ Thread.sleep(5000);
+ logger.info("Found health consumers: " + origQueueConsumers.toString());
+ logger.info("hopefully cleaned up ...");
+ AnalysisRequestTracker storedTracker = store.query(tracker.getRequest().getId());
+ Assert.assertEquals(STATUS.ERROR, storedTracker.getStatus());
+ logger.info("DONE CLEANING UP, ALL FINE");
+ }
+
+ Assert.assertEquals(0, multiThreadErrors.size());
+ }
+
+ public Properties getKafkaConsumerConfigProperties(String consumerGroupID) {
+ SettingsManager odfConfig = new ODFFactory().create().getSettingsManager();
+ Properties kafkaConsumerProps = odfConfig.getKafkaConsumerProperties();
+ kafkaConsumerProps.put("group.id", consumerGroupID);
+ if (zookeeperConnectString != null) {
+ kafkaConsumerProps.put("zookeeper.connect", zookeeperConnectString);
+ }
+
+ kafkaConsumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ kafkaConsumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ kafkaConsumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ StringBuilder bld = new StringBuilder();
+ final Iterator<String> iterator = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperConnectString).iterator();
+ while (iterator.hasNext()) {
+ bld.append(iterator.next());
+ if (iterator.hasNext()) {
+ bld.append(",");
+ }
+ }
+ kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bld.toString());
+ kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+ return kafkaConsumerProps;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MessageSearchConsumerTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MessageSearchConsumerTest.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MessageSearchConsumerTest.java
new file mode 100755
index 0000000..35b09e2
--- /dev/null
+++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MessageSearchConsumerTest.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.atlas.odf.core.messaging.kafka.MessageSearchConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.api.engine.PartitionOffsetInfo;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaMonitor;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueManager;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+
+import kafka.admin.AdminUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+public class MessageSearchConsumerTest extends ODFTestBase {
+ private static final String TEST_SEARCH_STRING = "TEST_STRING_" + UUID.randomUUID().toString();
+ private static final String TEST_SEARCH_FAILURE_STRING = "TEST_FAILURE_STRING";
+ static Logger logger = ODFTestLogger.get();
+ final static String topicName = "MessageSearchConsumerTest" + UUID.randomUUID().toString();
+ private static final int PERFORMANCE_MSG_COUNT = 1000;
+ static String zookeeperHost = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
+ private KafkaProducer<String, String> producer;
+
+ @BeforeClass
+ public static void createTopc() {
+ ZkClient zkClient = new ZkClient(zookeeperHost, 5000, 5000, ZKStringSerializer$.MODULE$);
+ ZkUtils utils = new ZkUtils(zkClient, new ZkConnection(zookeeperHost), false);
+ if (!AdminUtils.topicExists(utils, topicName)) {
+ AdminUtils.createTopic(utils, topicName, 2, 1, new Properties(), KafkaQueueManager.DEFAULT_RACK_AWARE_MODE);
+ }
+ }
+
+ @Test
+ public void testMsgSearchPerformance() throws InterruptedException, ExecutionException, TimeoutException {
+ logger.info("Producing msgs");
+ for (int no = 0; no < PERFORMANCE_MSG_COUNT; no++) {
+ sendMsg("DUMMY_MSG" + no);
+ }
+ sendMsg(TEST_SEARCH_STRING);
+ logger.info("Done producing ...");
+ Thread.sleep(200);
+
+ final ThreadManager threadManager = new ODFInternalFactory().create(ThreadManager.class);
+ final CountDownLatch searchLatch = new CountDownLatch(1);
+ threadManager.startUnmanagedThread(UUID.randomUUID().toString() + "_searchThread", new MessageSearchConsumer(topicName, new MessageSearchConsumer.SearchCompletedCallback() {
+
+ @Override
+ public void onDoneSearching(Map<String, PartitionOffsetInfo> msgPositionMap) {
+ logger.info("Done searching " + msgPositionMap.get(TEST_SEARCH_STRING).getOffset());
+ Assert.assertTrue(msgPositionMap.get(TEST_SEARCH_STRING).getOffset() > -1);
+ searchLatch.countDown();
+ }
+ }, Arrays.asList(TEST_SEARCH_STRING)));
+
+ boolean await = searchLatch.await(5, TimeUnit.SECONDS);
+ if (await) {
+ logger.info("Messages searched in time");
+ } else {
+ logger.warning("Couldnt finish search in time");
+ }
+
+ final CountDownLatch failureSearchLatch = new CountDownLatch(1);
+ threadManager.startUnmanagedThread(UUID.randomUUID().toString() + "_searchThread", new MessageSearchConsumer(topicName, new MessageSearchConsumer.SearchCompletedCallback() {
+
+ @Override
+ public void onDoneSearching(Map<String, PartitionOffsetInfo> msgPositionMap) {
+ logger.info("Done searching " + msgPositionMap.toString());
+ Assert.assertFalse(msgPositionMap.containsKey(TEST_SEARCH_FAILURE_STRING));
+ failureSearchLatch.countDown();
+ }
+ }, Arrays.asList(TEST_SEARCH_FAILURE_STRING)));
+
+ await = searchLatch.await(5, TimeUnit.SECONDS);
+ if (await) {
+ logger.info("Messages searched in time");
+ } else {
+ logger.warning("Couldnt finish search in time");
+ }
+ }
+
+ @Test
+ public void testMsgSearchSuccessAndFailure() throws InterruptedException, ExecutionException, TimeoutException {
+ sendMsg(TEST_SEARCH_STRING);
+
+ Thread.sleep(200);
+
+ final ThreadManager threadManager = new ODFInternalFactory().create(ThreadManager.class);
+ final CountDownLatch searchLatch = new CountDownLatch(1);
+ threadManager.startUnmanagedThread(UUID.randomUUID().toString() + "_searchThread", new MessageSearchConsumer(topicName, new MessageSearchConsumer.SearchCompletedCallback() {
+
+ @Override
+ public void onDoneSearching(Map<String, PartitionOffsetInfo> msgPositionMap) {
+ logger.info("Done searching " + msgPositionMap.get(TEST_SEARCH_STRING).getOffset());
+ Assert.assertTrue(msgPositionMap.get(TEST_SEARCH_STRING).getOffset() > -1);
+ searchLatch.countDown();
+ }
+ }, Arrays.asList(TEST_SEARCH_STRING)));
+
+ boolean await = searchLatch.await(5, TimeUnit.SECONDS);
+ if (await) {
+ logger.info("Messages searched in time");
+ } else {
+ logger.warning("Couldnt finish search in time");
+ }
+
+ final CountDownLatch failureSearchLatch = new CountDownLatch(1);
+ threadManager.startUnmanagedThread(UUID.randomUUID().toString() + "_searchThread", new MessageSearchConsumer(topicName, new MessageSearchConsumer.SearchCompletedCallback() {
+
+ @Override
+ public void onDoneSearching(Map<String, PartitionOffsetInfo> msgPositionMap) {
+ logger.info("Done searching " + msgPositionMap);
+ Assert.assertFalse(msgPositionMap.containsKey(TEST_SEARCH_FAILURE_STRING));
+ failureSearchLatch.countDown();
+ }
+ }, Arrays.asList(TEST_SEARCH_FAILURE_STRING)));
+
+ await = searchLatch.await(5, TimeUnit.SECONDS);
+ if (await) {
+ logger.info("Messages searched in time");
+ } else {
+ logger.warning("Couldnt finish search in time");
+ }
+ }
+
+ void sendMsg(String msg) throws InterruptedException, ExecutionException, TimeoutException {
+ final KafkaProducer<String, String> producer = getProducer();
+ ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topicName, UUID.randomUUID().toString(), msg);
+ producer.send(producerRecord).get(15000, TimeUnit.MILLISECONDS);
+ }
+
+ private KafkaProducer<String, String> getProducer() {
+ if (this.producer == null) {
+ SettingsManager odfConfig = new ODFFactory().create().getSettingsManager();
+ Properties props = odfConfig.getKafkaProducerProperties();
+ final Iterator<String> brokers = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperHost).iterator();
+ StringBuilder brokersString = new StringBuilder();
+ while (brokers.hasNext()) {
+ brokersString.append(brokers.next());
+ if (brokers.hasNext()) {
+ brokersString.append(",");
+ }
+ }
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersString.toString());
+ producer = new KafkaProducer<String, String>(props);
+ }
+ return producer;
+ }
+
+ @After
+ public void closeProducer() {
+ if (getProducer() != null) {
+ getProducer().close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MultiPartitionConsumerTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MultiPartitionConsumerTest.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MultiPartitionConsumerTest.java
new file mode 100755
index 0000000..f97dd4e
--- /dev/null
+++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MultiPartitionConsumerTest.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.atlas.odf.api.engine.ThreadStatus;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInitializer;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaMonitor;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueConsumer;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueManager;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager.ThreadStartupResult;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+
+import kafka.admin.AdminUtils;
+import kafka.common.TopicExistsException;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+public class MultiPartitionConsumerTest extends ODFTestcase {
+ static Logger logger = ODFTestLogger.get();
+ final static String topicName = "my_dummy_test_topic" + UUID.randomUUID().toString();
+ static String zookeeperHost = new ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
+ static final int PARTITION_COUNT = 3;
+ private static final int MSG_PER_PARTITION = 5;
+ private final ThreadManager threadManager = new ODFInternalFactory().create(ThreadManager.class);
+
+ @BeforeClass
+ public static void setupTopic() {
+ ZkClient zkClient = null;
+ try {
+ zkClient = new ZkClient(zookeeperHost, 5000, 5000, ZKStringSerializer$.MODULE$);
+ logger.log(Level.FINEST, "Checking if topic ''{0}'' already exists", topicName);
+ // using partition size 1 and replication size 1, no special
+ // per-topic config needed
+ logger.log(Level.FINE, "Topic ''{0}'' does not exist, creating it", topicName);
+ //FIXME zkUtils isSecure parameter? Only with SSL! --> parse zkhosts?
+ AdminUtils.createTopic(new ZkUtils(zkClient, new ZkConnection(zookeeperHost), false), topicName, PARTITION_COUNT, 1, new Properties(), KafkaQueueManager.DEFAULT_RACK_AWARE_MODE);
+ logger.log(Level.FINE, "Topic ''{0}'' created", topicName);
+ } catch (TopicExistsException ex) {
+ logger.log(Level.FINE, "Topic ''{0}'' already exists.", topicName);
+ } catch (ZkTimeoutException zkte) {
+ logger.log(Level.SEVERE, "Could not connect to the Zookeeper instance at ''{0}''. Please ensure that Zookeeper is running", zookeeperHost);
+ } finally {
+ if (zkClient != null) {
+ zkClient.close();
+ }
+ }
+ }
+
+ @After
+ public void cleanupConsumers() {
+ logger.info("Cleaning up consumers...");
+ logger.info("---------------------------------- Stopping ODF...");
+ ODFInitializer.stop();
+ logger.info("---------------------------------- Starting ODF...");
+ ODFInitializer.start();
+ logger.info("---------------------------------- ODF started.");
+ }
+
+ @Test
+ public void testMultiPartitionDelayedConsumption() throws InterruptedException, ExecutionException {
+ Properties kafkaConsumerProperties = getConsumerProps();
+ final List<String> consumedMsgs = new ArrayList<String>();
+ List<ThreadStartupResult> startupList = new ArrayList<ThreadStartupResult>();
+
+ final String threadPrefix = "TEST_CONSUMER_RETRY_RUNNING_";
+ final int processingDelay = 2000;
+ for (int no = 0; no < PARTITION_COUNT; no++) {
+ final int currentThread = no;
+ final QueueMessageProcessor requestConsumer = new QueueMessageProcessor() {
+
+ @Override
+ public void process(ExecutorService executorService, String msg, int partition, long msgOffset) {
+ try {
+ Thread.sleep(processingDelay);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ consumedMsgs.add(msg);
+ logger.info("process " + msg + " in thread " + currentThread);
+ }
+ };
+
+ KafkaQueueConsumer cnsmr = new KafkaQueueConsumer(topicName, kafkaConsumerProperties, requestConsumer);
+
+ final String consumerThread = threadPrefix + no;
+ final ThreadStartupResult startUnmanagedThread = threadManager.startUnmanagedThread(consumerThread, cnsmr);
+ startupList.add(startUnmanagedThread);
+ }
+ try {
+ threadManager.waitForThreadsToBeReady(30000, startupList);
+ for (int no = 0; no < PARTITION_COUNT; no++) {
+ for (int msgNo = 0; msgNo < MSG_PER_PARTITION; msgNo++) {
+ sendMsg("Partition " + no + " msg " + msgNo);
+ }
+ }
+
+ int totalWait = 0;
+ while (totalWait < PARTITION_COUNT * MSG_PER_PARTITION * processingDelay + 10000 && consumedMsgs.size() < PARTITION_COUNT * MSG_PER_PARTITION) {
+ Thread.sleep(2000);
+ totalWait += 2000;
+ }
+
+ logger.info("Done with all messages after " + totalWait);
+
+ Assert.assertEquals(PARTITION_COUNT * MSG_PER_PARTITION, consumedMsgs.size());
+
+ for (int no = 0; no < PARTITION_COUNT; no++) {
+ final ThreadStatus.ThreadState stateOfUnmanagedThread = threadManager.getStateOfUnmanagedThread(threadPrefix + no);
+ Assert.assertEquals(ThreadStatus.ThreadState.RUNNING, stateOfUnmanagedThread);
+ }
+ } catch (TimeoutException e) {
+ Assert.fail("Consumer could not be started on time");
+ }
+ }
+
+ @Test
+ public void testMultiPartitionConsumption() throws InterruptedException, ExecutionException {
+ Properties kafkaConsumerProperties = getConsumerProps();
+ final List<String> consumedMsgs = new ArrayList<String>();
+ List<ThreadStartupResult> startupList = new ArrayList<ThreadStartupResult>();
+
+ final String threadPrefix = "TEST_CONSUMER_RETRY_RUNNING_";
+ for (int no = 0; no < PARTITION_COUNT; no++) {
+ final int currentThread = no;
+ final QueueMessageProcessor requestConsumer = new QueueMessageProcessor() {
+
+ @Override
+ public void process(ExecutorService executorService, String msg, int partition, long msgOffset) {
+ consumedMsgs.add(msg);
+ logger.info("process " + msg + " in thread " + currentThread);
+ }
+ };
+
+ KafkaQueueConsumer cnsmr = new KafkaQueueConsumer(topicName, kafkaConsumerProperties, requestConsumer);
+
+ final String consumerThread = threadPrefix + no;
+ final ThreadStartupResult startUnmanagedThread = threadManager.startUnmanagedThread(consumerThread, cnsmr);
+ startupList.add(startUnmanagedThread);
+ }
+ try {
+ threadManager.waitForThreadsToBeReady(30000, startupList);
+ for (int no = 0; no < PARTITION_COUNT; no++) {
+ for (int msgNo = 0; msgNo < MSG_PER_PARTITION; msgNo++) {
+ sendMsg("Partition " + no + " msg " + msgNo);
+ }
+ }
+
+ int totalWait = 0;
+ boolean done = false;
+ while (totalWait < 30 && !done) {
+ if (consumedMsgs.size() == PARTITION_COUNT * MSG_PER_PARTITION) {
+ done = true;
+ }
+ totalWait++;
+ Thread.sleep(500);
+ }
+
+ Assert.assertEquals(PARTITION_COUNT * MSG_PER_PARTITION, consumedMsgs.size());
+
+ for (int no = 0; no < PARTITION_COUNT; no++) {
+ final ThreadStatus.ThreadState stateOfUnmanagedThread = threadManager.getStateOfUnmanagedThread(threadPrefix + no);
+ Assert.assertEquals(ThreadStatus.ThreadState.RUNNING, stateOfUnmanagedThread);
+ }
+ } catch (TimeoutException e) {
+ Assert.fail("Consumer could not be started on time");
+ }
+ }
+
+ @Test
+ public void testMultiPartitionExceptionAndRetryDuringProcessing() throws InterruptedException, ExecutionException {
+ Properties kafkaConsumerProperties = getConsumerProps();
+ final List<String> consumedMsgs = new ArrayList<String>();
+ List<ThreadStartupResult> startupList = new ArrayList<ThreadStartupResult>();
+
+ final String threadPrefix = "TEST_CONSUMER_RETRY_RUNNING_";
+ for (int no = 0; no < PARTITION_COUNT; no++) {
+ final int currentThread = no;
+ final QueueMessageProcessor requestConsumer = new QueueMessageProcessor() {
+
+ private int excCount = 0;
+
+ @Override
+ public void process(ExecutorService executorService, String msg, int partition, long msgOffset) {
+ if (excCount < KafkaQueueConsumer.MAX_PROCESSING_EXCEPTIONS - 1) {
+ excCount++;
+ logger.info("Throw exception " + excCount + " on consumer " + currentThread);
+ throw new RuntimeException("Forced error on consumer");
+ }
+ consumedMsgs.add(msg);
+ logger.info("process " + msg + " in thread " + currentThread);
+ }
+ };
+
+ KafkaQueueConsumer cnsmr = new KafkaQueueConsumer(topicName, kafkaConsumerProperties, requestConsumer);
+
+ final String consumerThread = threadPrefix + no;
+ final ThreadStartupResult startUnmanagedThread = threadManager.startUnmanagedThread(consumerThread, cnsmr);
+ startupList.add(startUnmanagedThread);
+ }
+ try {
+ threadManager.waitForThreadsToBeReady(30000, startupList);
+ for (int no = 0; no < PARTITION_COUNT; no++) {
+ for (int msgNo = 0; msgNo < MSG_PER_PARTITION; msgNo++) {
+ sendMsg("Partition " + no + " msg " + msgNo);
+ }
+ }
+
+ int totalWait = 0;
+ boolean done = false;
+ while (totalWait < 30 && !done) {
+ if (consumedMsgs.size() == PARTITION_COUNT * MSG_PER_PARTITION) {
+ done = true;
+ }
+ totalWait++;
+ Thread.sleep(500);
+ }
+ Assert.assertEquals(PARTITION_COUNT * MSG_PER_PARTITION, consumedMsgs.size());
+
+ for (int no = 0; no < PARTITION_COUNT; no++) {
+ final ThreadStatus.ThreadState stateOfUnmanagedThread = threadManager.getStateOfUnmanagedThread(threadPrefix + no);
+ Assert.assertEquals(ThreadStatus.ThreadState.RUNNING, stateOfUnmanagedThread);
+ }
+ } catch (TimeoutException e) {
+ Assert.fail("Consumer could not be started on time");
+ }
+ }
+
+ private Properties getConsumerProps() {
+ SettingsManager odfConfig = new ODFFactory().create().getSettingsManager();
+ Properties kafkaConsumerProperties = odfConfig.getKafkaConsumerProperties();
+ final String groupId = "retrying-dummy-consumer";
+ kafkaConsumerProperties.put("group.id", groupId);
+ kafkaConsumerProperties.put("zookeeper.connect", zookeeperHost);
+ final Iterator<String> brokers = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperHost).iterator();
+ StringBuilder brokersString = new StringBuilder();
+ while (brokers.hasNext()) {
+ brokersString.append(brokers.next());
+ if (brokers.hasNext()) {
+ brokersString.append(",");
+ }
+ }
+ kafkaConsumerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersString.toString());
+ kafkaConsumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ kafkaConsumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ kafkaConsumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+
+ return kafkaConsumerProperties;
+ }
+
+ void sendMsg(String msg) throws InterruptedException, ExecutionException, TimeoutException {
+ SettingsManager odfConfig = new ODFFactory().create().getSettingsManager();
+ Properties props = odfConfig.getKafkaProducerProperties();
+ final Iterator<String> brokers = new ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperHost).iterator();
+ StringBuilder brokersString = new StringBuilder();
+ while (brokers.hasNext()) {
+ brokersString.append(brokers.next());
+ if (brokers.hasNext()) {
+ brokersString.append(",");
+ }
+ }
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersString.toString());
+ //Should we use a custom partitioner? we could try to involve consumer offsets and always put on "emptiest" partition
+ //props.put("partitioner.class", TestMessagePartitioner.class);
+
+ final KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
+ ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topicName, UUID.randomUUID().toString(), msg);
+ producer.send(producerRecord).get(3000, TimeUnit.MILLISECONDS);
+ producer.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceErrorTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceErrorTest.java b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceErrorTest.java
new file mode 100755
index 0000000..d1c9810
--- /dev/null
+++ b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceErrorTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+import org.apache.atlas.odf.core.test.controlcenter.ODFAPITest;
+
+public class ParallelServiceErrorTest extends ODFTestcase {
+ private static final int NUMBER_OF_QUEUED_REQUESTS = 1;
+ Logger log = ODFTestLogger.get();
+
+ @Test
+ public void runDataSetsInParallelError() throws Exception {
+ runDataSetsInParallelAndCheckResult(Arrays.asList(new String[] { "successID1", "errorID2" }), AnalysisRequestStatus.State.FINISHED, AnalysisRequestStatus.State.ERROR);
+ }
+
+ private void runDataSetsInParallelAndCheckResult(List<String> dataSetIDs, AnalysisRequestStatus.State... expectedState) throws Exception {
+ log.info("Running data sets in parallel: " + dataSetIDs);
+ log.info("Expected state: " + expectedState);
+ AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager();
+
+ List<AnalysisRequest> requests = new ArrayList<AnalysisRequest>();
+ List<AnalysisResponse> responses = new ArrayList<AnalysisResponse>();
+ List<String> idList = new ArrayList<String>();
+
+ for (int no = 0; no < NUMBER_OF_QUEUED_REQUESTS; no++) {
+ for (String dataSet : dataSetIDs) {
+ final AnalysisRequest req = ODFAPITest.createAnalysisRequest(Arrays.asList(dataSet + UUID.randomUUID().toString()));
+ AnalysisResponse resp = analysisManager.runAnalysis(req);
+ req.setId(resp.getId());
+ requests.add(req);
+ idList.add(resp.getId());
+ responses.add(resp);
+ }
+ }
+ log.info("Parallel requests started: " + idList.toString());
+
+ Assert.assertEquals(NUMBER_OF_QUEUED_REQUESTS * dataSetIDs.size(), requests.size());
+ Assert.assertEquals(NUMBER_OF_QUEUED_REQUESTS * dataSetIDs.size(), responses.size());
+
+ // check that requests are processed in parallel:
+ // there must be a point in time where both requests are in status "active"
+ log.info("Polling for status of parallel request...");
+ boolean foundPointInTimeWhereBothRequestsAreActive = false;
+ int maxPolls = ODFAPITest.MAX_NUMBER_OF_POLLS;
+ List<AnalysisRequestStatus.State> allSingleStates = new ArrayList<AnalysisRequestStatus.State>();
+ do {
+ int foundActive = 0;
+ allSingleStates.clear();
+ for (AnalysisRequest request : requests) {
+ final AnalysisRequestStatus.State state = analysisManager.getAnalysisRequestStatus(request.getId()).getState();
+ if (state == AnalysisRequestStatus.State.ACTIVE) {
+ log.info("ACTIVE: " + request.getId() + " foundactive: " + foundActive);
+ foundActive++;
+ } else {
+ log.info("NOT ACTIVE " + request.getId() + " _ " + state);
+ }
+ allSingleStates.add(state);
+ }
+ if (foundActive > 1) {
+ foundPointInTimeWhereBothRequestsAreActive = true;
+ }
+
+ maxPolls--;
+ Thread.sleep(ODFAPITest.WAIT_MS_BETWEEN_POLLING);
+ } while (maxPolls > 0 && Utils.containsNone(allSingleStates, new AnalysisRequestStatus.State[] { AnalysisRequestStatus.State.ACTIVE, AnalysisRequestStatus.State.QUEUED }));
+
+ Assert.assertTrue(maxPolls > 0);
+ Assert.assertTrue(foundPointInTimeWhereBothRequestsAreActive);
+ Assert.assertTrue(allSingleStates.containsAll(Arrays.asList(expectedState)));
+ }
+}