You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/18 23:01:22 UTC
[25/47] samza git commit: added JobModelManager to ThreadJob
added JobModelManager to ThreadJob
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/22034947
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/22034947
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/22034947
Branch: refs/heads/NewKafkaSystemConsumer
Commit: 22034947b998d3604bc3911a417b9c1e761bb90f
Parents: c14557f
Author: Boris S <bo...@apache.org>
Authored: Fri Aug 31 14:36:51 2018 -0700
Committer: Boris S <bo...@apache.org>
Committed: Fri Aug 31 14:36:51 2018 -0700
----------------------------------------------------------------------
.../stream/CoordinatorStreamSystemConsumer.java | 4 +-
.../org/apache/samza/job/local/ThreadJob.scala | 5 +-
.../samza/job/local/ThreadJobFactory.scala | 2 +-
.../apache/samza/job/local/TestThreadJob.scala | 9 ++
.../system/kafka/NewKafkaSystemConsumer.java | 121 +++++++++----------
.../integration/TestShutdownStatefulTask.scala | 4 +-
6 files changed, 75 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/22034947/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
index 38255a2..0bdb874 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -176,7 +176,7 @@ public class CoordinatorStreamSystemConsumer {
valueMap = messageSerde.fromBytes((byte[]) envelope.getMessage());
}
CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, valueMap);
- log.debug("Received coordinator stream message: {}", coordinatorStreamMessage);
+ log.info("Received coordinator stream message: {}", coordinatorStreamMessage);
// Remove any existing entry. Set.add() does not add if the element already exists.
if (bootstrappedMessages.remove(coordinatorStreamMessage)) {
log.debug("Removed duplicate message: {}", coordinatorStreamMessage);
@@ -194,7 +194,7 @@ public class CoordinatorStreamSystemConsumer {
}
bootstrappedStreamSet = Collections.unmodifiableSet(bootstrappedMessages);
- log.debug("Bootstrapped configuration: {}", configMap);
+ log.info("Bootstrapped configuration: {}", configMap);
isBootstrapped = true;
} catch (Exception e) {
throw new SamzaException(e);
http://git-wip-us.apache.org/repos/asf/samza/blob/22034947/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
index a61a297..33dde52 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
@@ -19,11 +19,12 @@
package org.apache.samza.job.local
+import org.apache.samza.coordinator.JobModelManager
import org.apache.samza.job.ApplicationStatus.{New, Running, SuccessfulFinish, UnsuccessfulFinish}
import org.apache.samza.job.{ApplicationStatus, StreamJob}
import org.apache.samza.util.Logging
-class ThreadJob(runnable: Runnable) extends StreamJob with Logging {
+class ThreadJob(runnable: Runnable, val jobModelManager: JobModelManager) extends StreamJob with Logging {
@volatile var jobStatus: Option[ApplicationStatus] = None
var thread: Thread = null
@@ -43,6 +44,8 @@ class ThreadJob(runnable: Runnable) extends StreamJob with Logging {
jobStatus = Some(UnsuccessfulFinish)
throw e
}
+ } finally {
+ jobModelManager.stop
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/22034947/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 0b472aa..4b08721 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -110,7 +110,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
taskFactory)
container.setContainerListener(containerListener)
- val threadJob = new ThreadJob(container)
+ val threadJob = new ThreadJob(container, coordinator)
threadJob
} finally {
coordinator.stop
http://git-wip-us.apache.org/repos/asf/samza/blob/22034947/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
index 4f3f511..b1de215 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
@@ -19,6 +19,7 @@
package org.apache.samza.job.local
+import org.apache.samza.coordinator.JobModelManager
import org.junit.Assert._
import org.junit.Test
import org.apache.samza.job.ApplicationStatus
@@ -29,6 +30,10 @@ class TestThreadJob {
val job = new ThreadJob(new Runnable {
override def run {
}
+ }, new JobModelManager(null) {
+ override def stop: Unit = {
+
+ }
})
job.submit
job.waitForFinish(999999)
@@ -40,6 +45,10 @@ class TestThreadJob {
override def run {
Thread.sleep(999999)
}
+ }, new JobModelManager(null) {
+ override def stop: Unit = {
+
+ }
})
job.submit
job.waitForFinish(500)
http://git-wip-us.apache.org/repos/asf/samza/blob/22034947/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
index b745628..e34812f 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
@@ -1,3 +1,4 @@
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -21,47 +22,38 @@
package org.apache.samza.system.kafka;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
import kafka.common.TopicAndPartition;
import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.KafkaConsumerConfig;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.Deserializer;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.KafkaConfig;
-import org.apache.samza.config.StreamConfig;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
-import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.BlockingEnvelopeMap;
import org.apache.samza.util.Clock;
-import org.apache.samza.util.KafkaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
-import scala.collection.JavaConversions;
-public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements SystemConsumer{
+public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements SystemConsumer {
private static final Logger LOG = LoggerFactory.getLogger(NewKafkaSystemConsumer.class);
private static final long FETCH_THRESHOLD = 50000;
private static final long FETCH_THRESHOLD_BYTES = -1L;
- private final Consumer<K,V> kafkaConsumer;
+ private final Consumer<K, V> kafkaConsumer;
private final String systemName;
private final KafkaSystemConsumerMetrics samzaConsumerMetrics;
private final String clientId;
@@ -78,8 +70,8 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
private KafkaConsumerProxy proxy;
/* package private */final Map<TopicPartition, String> topicPartitions2Offset = new HashMap<>();
- /* package private */long perPartitionFetchThreshold;
- /* package private */long perPartitionFetchThresholdBytes;
+ /* package private */ long perPartitionFetchThreshold;
+ /* package private */ long perPartitionFetchThresholdBytes;
// TODO - consider new class for KafkaSystemConsumerMetrics
@@ -88,15 +80,10 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
* @param config
* @param metrics
*/
- public NewKafkaSystemConsumer(
- Consumer<K,V> kafkaConsumer,
- String systemName,
- Config config,
- String clientId,
- KafkaSystemConsumerMetrics metrics,
- Clock clock) {
+ protected NewKafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Config config, String clientId,
+ KafkaSystemConsumerMetrics metrics, Clock clock) {
- super(metrics.registry(),clock, metrics.getClass().getName());
+ super(metrics.registry(), clock, metrics.getClass().getName());
this.samzaConsumerMetrics = metrics;
this.clientId = clientId;
@@ -109,26 +96,20 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
this.fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
LOG.info(String.format(
- "Created SamzaLiKafkaSystemConsumer for system=%s, clientId=%s, metricName=%s with liKafkaConsumer=%s",
- systemName, clientId, metricName, this.kafkaConsumer.toString()));
+ "Created SamzaKafkaSystemConsumer for system=%s, clientId=%s, metricName=%s with KafkaConsumer=%s", systemName,
+ clientId, metricName, this.kafkaConsumer.toString()));
}
- public static <K, V> NewKafkaSystemConsumer getNewKafkaSystemConsumer(
- String systemName,
- Config config,
- String clientId,
- KafkaSystemConsumerMetrics metrics,
- Clock clock) {
+ public static <K, V> NewKafkaSystemConsumer getNewKafkaSystemConsumer(String systemName, Config config,
+ String clientId, KafkaSystemConsumerMetrics metrics, Clock clock) {
+
+ System.out.println("GETTING FOR " + systemName);
+ System.out.printf("RETURNING NEW ONE");
// extract consumer configs and create kafka consumer
KafkaConsumer<K, V> kafkaConsumer = getKafkaConsumerImpl(systemName, clientId, config);
- return new NewKafkaSystemConsumer(kafkaConsumer,
- systemName,
- config,
- clientId,
- metrics,
- clock);
+ return new NewKafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics, clock);
}
/**
@@ -146,7 +127,8 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
KafkaConsumerConfig consumerConfig =
KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId, injectProps);
- LOG.info("==============>Consumer properties in getKafkaConsumerImpl: systemName: {}, consumerProperties: {}", systemName, consumerConfig.originals());
+ LOG.info("==============>Consumer properties in getKafkaConsumerImpl: systemName: {}, consumerProperties: {}",
+ systemName, consumerConfig.originals());
return new KafkaConsumer<>(consumerConfig.originals());
}
@@ -157,7 +139,7 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
LOG.warn("attempting to start the consumer for the second (or more) time.");
return;
}
- if(stopped.get()) {
+ if (stopped.get()) {
LOG.warn("attempting to start a stopped consumer");
return;
}
@@ -197,8 +179,7 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
messageSink = new KafkaConsumerMessageSink();
// create the thread with the consumer
- proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink,
- samzaConsumerMetrics, metricName);
+ proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, samzaConsumerMetrics, metricName);
LOG.info("==============>Created consumer proxy: " + proxy);
}
@@ -231,8 +212,10 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
proxy.addTopicPartition(topicPartitions2SSP.get(tp), startingOffset);
});
+ System.out.println("#####################started " + this + "; kc=" + kafkaConsumer);
// start the proxy thread
if (proxy != null && !proxy.isRunning()) {
+ System.out.println("#####################starting proxy " + proxy);
proxy.start();
}
}
@@ -242,33 +225,37 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
KafkaConfig kafkaConfig = new KafkaConfig(config);
Option<String> fetchThresholdOption = kafkaConfig.getConsumerFetchThreshold(systemName);
long fetchThreshold = FETCH_THRESHOLD;
- if(fetchThresholdOption.isDefined()) {
+ if (fetchThresholdOption.isDefined()) {
fetchThreshold = Long.valueOf(fetchThresholdOption.get());
LOG.info("fetchThresholdOption is defined. fetchThreshold=" + fetchThreshold);
}
Option<String> fetchThresholdBytesOption = kafkaConfig.getConsumerFetchThresholdBytes(systemName);
long fetchThresholdBytes = FETCH_THRESHOLD_BYTES;
- if(fetchThresholdBytesOption.isDefined()) {
+ if (fetchThresholdBytesOption.isDefined()) {
fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get());
LOG.info("fetchThresholdBytesOption is defined. fetchThresholdBytes=" + fetchThresholdBytes);
}
LOG.info("fetchThresholdBytes = " + fetchThresholdBytes + "; fetchThreshold=" + fetchThreshold);
- LOG.info("topicPartitions2Offset #=" + topicPartitions2Offset.size() + "; topicPartition2SSP #=" + topicPartitions2SSP.size());
+ LOG.info("topicPartitions2Offset #=" + topicPartitions2Offset.size() + "; topicPartition2SSP #="
+ + topicPartitions2SSP.size());
if (topicPartitions2SSP.size() > 0) {
perPartitionFetchThreshold = fetchThreshold / topicPartitions2SSP.size();
LOG.info("perPartitionFetchThreshold=" + perPartitionFetchThreshold);
- if(fetchThresholdBytesEnabled) {
+ if (fetchThresholdBytesEnabled) {
// currently this feature cannot be enabled, because we do not have the size of the messages available.
// messages get double buffered, hence divide by 2
perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / topicPartitions2SSP.size();
- LOG.info("perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes=" + perPartitionFetchThresholdBytes);
+ LOG.info("perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes="
+ + perPartitionFetchThresholdBytes);
}
}
}
@Override
public void stop() {
+ System.out.println("##################### stopping " + this + "; kc=" + kafkaConsumer);
+
if (!stopped.compareAndSet(false, true)) {
LOG.warn("attempting to stop stopped consumer.");
return;
@@ -276,8 +263,10 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
LOG.warn("Stopping SamzaRawLiKafkaConsumer + " + this);
// stop the proxy (with 5 minutes timeout)
- if(proxy != null)
+ if (proxy != null) {
+ System.out.println("##################### stopping proxy " + proxy);
proxy.stop(TimeUnit.MINUTES.toMillis(5));
+ }
try {
synchronized (kafkaConsumer) {
@@ -293,6 +282,14 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
*/
@Override
public void register(SystemStreamPartition systemStreamPartition, String offset) {
+ if (started.get()) {
+ String msg =
+ String.format("Trying to register partition after consumer has been started. sn=%s, ssp=%s", systemName,
+ systemStreamPartition);
+ LOG.error(msg);
+ throw new SamzaException(msg);
+ }
+
if (!systemStreamPartition.getSystem().equals(systemName)) {
LOG.warn("ignoring SSP " + systemStreamPartition + ", because this consumer's system is " + systemName);
return;
@@ -332,16 +329,17 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
@Override
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
- Set<SystemStreamPartition> systemStreamPartitions, long timeout)
- throws InterruptedException {
+ Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
// check if the proxy is running
- if(!proxy.isRunning()) {
+ if (!proxy.isRunning()) {
stop();
if (proxy.getFailureCause() != null) {
String message = "LiKafkaConsumerProxy has stopped";
- if(proxy.getFailureCause() instanceof org.apache.kafka.common.errors.TopicAuthorizationException)
- message += " due to TopicAuthorizationException Please refer to go/samzaacluserguide to correctly set up acls for your topic";
+ if (proxy.getFailureCause() instanceof org.apache.kafka.common.errors.TopicAuthorizationException) {
+ message +=
+ " due to TopicAuthorizationException Please refer to go/samzaacluserguide to correctly set up acls for your topic";
+ }
throw new SamzaException(message, proxy.getFailureCause());
} else {
LOG.warn("Failure cause not populated for LiKafkaConsumerProxy");
@@ -349,7 +347,9 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
}
}
- return super.poll(systemStreamPartitions, timeout);
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> res = super.poll(systemStreamPartitions, timeout);
+ LOG.info("=============================>. Res in POLL:" + res.toString());
+ return res;
}
public static TopicAndPartition toTopicAndPartition(TopicPartition tp) {
@@ -376,15 +376,6 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
return systemName;
}
- private static Set<SystemStream> getIntermediateStreams(Config config) {
- StreamConfig streamConfig = new StreamConfig(config);
- Collection<String> streamIds = JavaConversions.asJavaCollection(streamConfig.getStreamIds());
- return streamIds.stream()
- .filter(streamConfig::getIsIntermediateStream)
- .map(id -> streamConfig.streamIdToSystemStream(id))
- .collect(Collectors.toSet());
- }
-
////////////////////////////////////
// inner class for the message sink
////////////////////////////////////
@@ -395,10 +386,11 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
}
boolean needsMoreMessages(SystemStreamPartition ssp) {
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};"
- + "(limit={}); messagesNumInQueue={}(limit={};", ssp, fetchThresholdBytesEnabled, getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes,
- getNumMessagesInQueue(ssp), perPartitionFetchThreshold);
+ + "(limit={}); messagesNumInQueue={}(limit={};", ssp, fetchThresholdBytesEnabled,
+ getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, getNumMessagesInQueue(ssp),
+ perPartitionFetchThreshold);
}
if (fetchThresholdBytesEnabled) {
@@ -415,8 +407,7 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
put(ssp, envelope);
} catch (InterruptedException e) {
throw new SamzaException(
- String.format("Interrupted while trying to add message with offset %s for ssp %s",
- envelope.getOffset(),
+ String.format("Interrupted while trying to add message with offset %s for ssp %s", envelope.getOffset(),
ssp));
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/22034947/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
index e4d47d1..a42433c 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
@@ -82,13 +82,15 @@ class TestShutdownStatefulTask extends StreamTaskTestUtil {
assertEquals(0, task.received.size)
// Send some messages to input stream.
+ System.out.println("************************BEFORE DONE sending")
send(task, "1")
+ System.out.println("************************FIRST DONE sending")
send(task, "2")
send(task, "3")
send(task, "2")
send(task, "99")
send(task, "99")
-
+ System.out.println("************************DONE sending")
stopJob(job)
}