You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/07/25 00:40:56 UTC
git commit: SAMZA-245; improve system consumers performance
Repository: incubator-samza
Updated Branches:
refs/heads/master 5c65b03a4 -> 7cecf0aef
SAMZA-245; improve system consumers performance
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/7cecf0ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/7cecf0ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/7cecf0ae
Branch: refs/heads/master
Commit: 7cecf0aef9fb968b0851746bdb01eaca1dc81050
Parents: 5c65b03
Author: Chris Riccomini <criccomi@criccomi-mn.(none)>
Authored: Thu Jul 24 15:40:49 2014 -0700
Committer: Chris Riccomini <criccomi@criccomi-mn.(none)>
Committed: Thu Jul 24 15:40:49 2014 -0700
----------------------------------------------------------------------
.../0.7.0/jobs/configuration-table.html | 15 +
.../org/apache/samza/system/SystemConsumer.java | 31 ++-
.../system/SystemStreamPartitionIterator.java | 21 +-
.../apache/samza/util/BlockingEnvelopeMap.java | 56 ++--
.../TestSystemStreamPartitionIterator.java | 22 +-
.../samza/util/TestBlockingEnvelopeMap.java | 40 +--
.../org/apache/samza/config/TaskConfig.scala | 19 ++
.../apache/samza/container/SamzaContainer.scala | 13 +-
.../apache/samza/system/SystemConsumers.scala | 272 ++++++++++---------
.../samza/system/SystemConsumersMetrics.scala | 12 +-
.../chooser/BufferingMessageChooser.scala | 208 --------------
.../system/chooser/RoundRobinChooser.scala | 27 +-
.../org/apache/samza/util/DoublingBackOff.scala | 62 -----
.../samza/system/TestSystemConsumers.scala | 181 ++++++++++--
.../chooser/TestBufferingMessageChooser.scala | 108 --------
.../TestFileReaderSystemConsumer.scala | 45 +--
.../apache/samza/util/TestDoublingBackOff.scala | 61 -----
.../test/performance/TestPerformanceTask.scala | 4 +-
.../TestSamzaContainerPerformance.scala | 2 +-
19 files changed, 453 insertions(+), 746 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/docs/learn/documentation/0.7.0/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/jobs/configuration-table.html b/docs/learn/documentation/0.7.0/jobs/configuration-table.html
index edcb74f..41f334f 100644
--- a/docs/learn/documentation/0.7.0/jobs/configuration-table.html
+++ b/docs/learn/documentation/0.7.0/jobs/configuration-table.html
@@ -377,6 +377,21 @@
</tr>
<tr>
+ <td class="property" id="task-poll-interval-ms">task.poll.interval.ms</td>
+ <td class="default"></td>
+ <td class="description">
+ Samza's container polls for more messages under two conditions. The first condition arises when there are simply no remaining
+ buffered messages to process for any input SystemStreamPartition. The second condition arises when some input
+ SystemStreamPartitions have empty buffers, but some do not. In the latter case, a polling interval is defined to determine how
+ often to refresh the empty SystemStreamPartition buffers. By default, this interval is 50ms, which means that any empty
+ SystemStreamPartition buffer will be refreshed at least every 50ms. A higher value here means that empty SystemStreamPartitions
+ will be refreshed less often, which means more latency is introduced, but less CPU and network will be used. Decreasing this
+ value means that empty SystemStreamPartitions are refreshed more frequently, thereby introducing less latency, but increasing
+ CPU and network utilization.
+ </td>
+ </tr>
+
+ <tr>
<th colspan="3" class="section" id="streams"><a href="../container/streams.html">Systems (input and output streams)</a></th>
</tr>
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java b/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
index 591f8fb..37c6c76 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
@@ -21,6 +21,7 @@ package org.apache.samza.system;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* <p>
@@ -137,35 +138,39 @@ public interface SystemConsumer {
* Poll the SystemConsumer to get any available messages from the underlying
* system.
*
- * <p>If the underlying implementation does not take care to adhere to the
+ * <p>
+ * If the underlying implementation does not take care to adhere to the
* timeout parameter, the SamzaContainer's performance will suffer
* drastically. Specifically, if poll blocks when it's not supposed to, it
* will block the entire main thread in SamzaContainer, and no messages will
* be processed while blocking is occurring.
+ * </p>
*
* @param systemStreamPartitions
- * A map from SystemStreamPartition to maximum number of messages to
- * return for the SystemStreamPartition. Polling with {stream1: 100,
- * stream2: 1000} tells the SystemConsumer that it can return between
- * 0 and 100 messages (inclusive) for stream1, and between 0 and 1000
- * messages for stream2. If SystemConsumer has messages available for
- * other registered SystemStreamPartitions, but they are not in the
- * systemStreamPartitions map in a given poll invocation, they can't
+ * A set of SystemStreamPartition to poll for new messages. If
+ * SystemConsumer has messages available for other registered
+ * SystemStreamPartitions, but they are not in the
+ * systemStreamPartitions set in a given poll invocation, they can't
* be returned. It is illegal to pass in SystemStreamPartitions that
* have not been registered with the SystemConsumer first.
* @param timeout
* If timeout < 0, poll will block unless all SystemStreamPartition
* are at "head" (the underlying system has been checked, and
- * returned an empty set). If at head, an empty list is returned. If
+ * returned an empty set). If at head, an empty map is returned. If
* timeout >= 0, poll will return any messages that are currently
* available for any of the SystemStreamPartitions specified. If no
* new messages are available, it will wait up to timeout
* milliseconds for messages from any SystemStreamPartition to become
- * available. It will return an empty list if the timeout is hit, and
+ * available. It will return an empty map if the timeout is hit, and
* no new messages are available.
- * @return A list of zero or more IncomingMessageEnvelopes for the
- * SystemStreamPartitions that were supplied during the invocation.
+ * @return A map from SystemStreamPartitions to any available
+ * IncomingMessageEnvelopes for the SystemStreamPartitions. If no
+ * messages are available for a SystemStreamPartition that was
+ * supplied in the polling set, the map will not contain a key for the
+ * SystemStreamPartition. Will return an empty map, not null, if no
+ * new messages are available for any SystemStreamPartitions in the
+ * input set.
* @throws InterruptedException
*/
- List<IncomingMessageEnvelope> poll(Map<SystemStreamPartition, Integer> systemStreamPartitions, long timeout) throws InterruptedException;
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
index 9acfb10..a8f858a 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
@@ -20,22 +20,25 @@
package org.apache.samza.system;
import java.util.ArrayDeque;
-import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
+import java.util.Set;
import org.apache.samza.SamzaException;
/**
- * {@link java.util.Iterator} that wraps a {@link org.apache.samza.system.SystemConsumer} to iterate over
- * the messages the consumer provides for the specified {@link org.apache.samza.system.SystemStreamPartition}.
+ * {@link java.util.Iterator} that wraps a
+ * {@link org.apache.samza.system.SystemConsumer} to iterate over the messages
+ * the consumer provides for the specified
+ * {@link org.apache.samza.system.SystemStreamPartition}.
*/
public class SystemStreamPartitionIterator implements Iterator<IncomingMessageEnvelope> {
private final SystemConsumer systemConsumer;
- private final Map<SystemStreamPartition, Integer> fetchMap;
+ private final Set<SystemStreamPartition> fetchSet;
private Queue<IncomingMessageEnvelope> peeks;
public SystemStreamPartitionIterator(SystemConsumer systemConsumer, SystemStreamPartition systemStreamPartition) {
@@ -44,8 +47,8 @@ public class SystemStreamPartitionIterator implements Iterator<IncomingMessageEn
public SystemStreamPartitionIterator(SystemConsumer systemConsumer, SystemStreamPartition systemStreamPartition, int fetchSize) {
this.systemConsumer = systemConsumer;
- this.fetchMap = new HashMap<SystemStreamPartition, Integer>();
- this.fetchMap.put(systemStreamPartition, fetchSize);
+ this.fetchSet = new HashSet<SystemStreamPartition>();
+ this.fetchSet.add(systemStreamPartition);
this.peeks = new ArrayDeque<IncomingMessageEnvelope>();
}
@@ -74,10 +77,10 @@ public class SystemStreamPartitionIterator implements Iterator<IncomingMessageEn
private void refresh() {
if (peeks.size() == 0) {
try {
- List<IncomingMessageEnvelope> envelopes = systemConsumer.poll(fetchMap, SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES);
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = systemConsumer.poll(fetchSet, SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES);
- if (envelopes != null && envelopes.size() > 0) {
- peeks.addAll(envelopes);
+ for (List<IncomingMessageEnvelope> systemStreamPartitionEnvelopes : envelopes.values()) {
+ peeks.addAll(systemStreamPartitionEnvelopes);
}
} catch (InterruptedException e) {
throw new SamzaException(e);
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index 9503739..317e073 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -20,13 +20,16 @@
package org.apache.samza.util;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
@@ -39,8 +42,8 @@ import org.apache.samza.system.SystemStreamPartition;
* BlockingEnvelopeMap is a helper class for SystemConsumer implementations.
* Samza's poll() requirements make implementing SystemConsumers somewhat
* tricky. BlockingEnvelopeMap is provided to help other developers write
- * SystemConsumers. The intended audience is not those writing Samza jobs,
- * but rather those extending Samza to consume from new types of stream providers
+ * SystemConsumers. The intended audience is not those writing Samza jobs, but
+ * rather those extending Samza to consume from new types of stream providers
* and other systems.
* </p>
*
@@ -97,55 +100,48 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
return new LinkedBlockingQueue<IncomingMessageEnvelope>();
}
- public List<IncomingMessageEnvelope> poll(Map<SystemStreamPartition, Integer> systemStreamPartitionAndMaxPerStream, long timeout) throws InterruptedException {
+ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
long stopTime = clock.currentTimeMillis() + timeout;
- List<IncomingMessageEnvelope> messagesToReturn = new ArrayList<IncomingMessageEnvelope>();
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messagesToReturn = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
metrics.incPoll();
- for (Map.Entry<SystemStreamPartition, Integer> systemStreamPartitionAndMaxCount : systemStreamPartitionAndMaxPerStream.entrySet()) {
- SystemStreamPartition systemStreamPartition = systemStreamPartitionAndMaxCount.getKey();
- Integer numMessages = systemStreamPartitionAndMaxCount.getValue();
+ for (SystemStreamPartition systemStreamPartition : systemStreamPartitions) {
BlockingQueue<IncomingMessageEnvelope> queue = bufferedMessages.get(systemStreamPartition);
- IncomingMessageEnvelope envelope = null;
- List<IncomingMessageEnvelope> systemStreamPartitionMessages = new ArrayList<IncomingMessageEnvelope>();
-
- // First, drain all messages up to numMessages without blocking.
- // Stop when we've filled the request (max numMessages), or when
- // we get a null envelope back.
- for (int i = 0; i < numMessages && (i == 0 || envelope != null); ++i) {
- envelope = queue.poll();
+ List<IncomingMessageEnvelope> outgoingList = new ArrayList<IncomingMessageEnvelope>(queue.size());
- if (envelope != null) {
- systemStreamPartitionMessages.add(envelope);
- }
- }
+ if (queue.size() > 0) {
+ queue.drainTo(outgoingList);
+ } else if (timeout != 0) {
+ IncomingMessageEnvelope envelope = null;
- // Now block if blocking is allowed and we have no messages.
- if (systemStreamPartitionMessages.size() == 0) {
// How long we can legally block (if timeout > 0)
long timeRemaining = stopTime - clock.currentTimeMillis();
if (timeout == SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES) {
- while (systemStreamPartitionMessages.size() < numMessages && !isAtHead(systemStreamPartition)) {
+ // Block until we get at least one message, or until we catch up to
+ // the head of the stream.
+ while (envelope == null && !isAtHead(systemStreamPartition)) {
metrics.incBlockingPoll(systemStreamPartition);
envelope = queue.poll(1000, TimeUnit.MILLISECONDS);
-
- if (envelope != null) {
- systemStreamPartitionMessages.add(envelope);
- }
}
} else if (timeout > 0 && timeRemaining > 0) {
+ // Block until we get at least one message.
metrics.incBlockingTimeoutPoll(systemStreamPartition);
envelope = queue.poll(timeRemaining, TimeUnit.MILLISECONDS);
+ }
- if (envelope != null) {
- systemStreamPartitionMessages.add(envelope);
- }
+ // If we got a message, add it.
+ if (envelope != null) {
+ outgoingList.add(envelope);
+ // Drain any remaining messages without blocking.
+ queue.drainTo(outgoingList);
}
}
- messagesToReturn.addAll(systemStreamPartitionMessages);
+ if (outgoingList.size() > 0) {
+ messagesToReturn.put(systemStreamPartition, outgoingList);
+ }
}
return messagesToReturn;
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java b/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java
index 3ecabab..5af2a11 100644
--- a/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java
+++ b/samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java
@@ -24,13 +24,15 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-
-import org.junit.Test;
+import java.util.Queue;
+import java.util.Set;
import org.apache.samza.Partition;
+import org.junit.Test;
public class TestSystemStreamPartitionIterator {
private static final SystemStreamPartition SSP = new SystemStreamPartition("test", "test", new Partition(0));
@@ -105,14 +107,20 @@ public class TestSystemStreamPartitionIterator {
}
@Override
- public List<IncomingMessageEnvelope> poll(Map<SystemStreamPartition, Integer> systemStreamPartitions, long timeout) {
- List<IncomingMessageEnvelope> list = new ArrayList<IncomingMessageEnvelope>();
+ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) {
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> systemStreamPartitionEnvelopes = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
+
+ for (SystemStreamPartition systemStreamPartition : systemStreamPartitions) {
+ List<IncomingMessageEnvelope> q = new ArrayList<IncomingMessageEnvelope>();
+
+ if (numPollReturnsWithMessages-- > 0) {
+ q.add(new IncomingMessageEnvelope(SSP, "", null, numPollReturnsWithMessages));
+ }
- if (numPollReturnsWithMessages-- > 0) {
- list.add(new IncomingMessageEnvelope(SSP, "", null, numPollReturnsWithMessages));
+ systemStreamPartitionEnvelopes.put(systemStreamPartition, q);
}
- return list;
+ return systemStreamPartitionEnvelopes;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
index cb4d148..35ba52d 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
@@ -19,26 +19,29 @@
package org.apache.samza.util;
-import static org.junit.Assert.*;
-import java.util.HashMap;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import org.junit.Test;
import org.apache.samza.Partition;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
public class TestBlockingEnvelopeMap {
private static final SystemStreamPartition SSP = new SystemStreamPartition("test", "test", new Partition(0));
private static final IncomingMessageEnvelope envelope = new IncomingMessageEnvelope(SSP, null, null, null);
- private static final Map<SystemStreamPartition, Integer> FETCH = new HashMap<SystemStreamPartition, Integer>();
+ private static final Set<SystemStreamPartition> FETCH = new HashSet<SystemStreamPartition>();
static {
- FETCH.put(SSP, 10);
+ FETCH.add(SSP);
}
@Test
@@ -65,33 +68,14 @@ public class TestBlockingEnvelopeMap {
BlockingEnvelopeMap map = new MockBlockingEnvelopeMap();
map.register(SSP, "0");
map.put(SSP, envelope);
- List<IncomingMessageEnvelope> envelopes = map.poll(FETCH, 0);
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = map.poll(FETCH, 0);
assertEquals(1, envelopes.size());
+ assertEquals(1, envelopes.get(SSP).size());
map.put(SSP, envelope);
map.put(SSP, envelope);
envelopes = map.poll(FETCH, 0);
- assertEquals(2, envelopes.size());
- }
-
- @Test
- public void testShouldNotReturnMoreEnvelopesThanAllowed() throws InterruptedException {
- BlockingEnvelopeMap map = new MockBlockingEnvelopeMap();
- int maxMessages = FETCH.get(SSP);
-
- map.register(SSP, "0");
-
- for (int i = 0; i < 3 * maxMessages; ++i) {
- map.put(SSP, envelope);
- }
-
- assertEquals(3 * maxMessages, map.getNumMessagesInQueue(SSP));
- assertEquals(maxMessages, map.poll(FETCH, 0).size());
- assertEquals(2 * maxMessages, map.getNumMessagesInQueue(SSP));
- assertEquals(maxMessages, map.poll(FETCH, 30).size());
- assertEquals(maxMessages, map.getNumMessagesInQueue(SSP));
- assertEquals(maxMessages, map.poll(FETCH, 0).size());
- assertEquals(0, map.getNumMessagesInQueue(SSP));
- assertEquals(0, map.poll(FETCH, 0).size());
+ assertEquals(1, envelopes.size());
+ assertEquals(2, envelopes.get(SSP).size());
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 8b881f2..21d8903 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -36,6 +36,23 @@ object TaskConfig {
val DROP_DESERIALIZATION_ERROR = "task.drop.deserialization.errors" // define whether drop the messages or not when deserialization fails
val DROP_SERIALIZATION_ERROR = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails
+ /**
+ * Samza's container polls for more messages under two conditions. The first
+ * condition arises when there are simply no remaining buffered messages to
+ * process for any input SystemStreamPartition. The second condition arises
+ * when some input SystemStreamPartitions have empty buffers, but some do
+ * not. In the latter case, a polling interval is defined to determine how
+ * often to refresh the empty SystemStreamPartition buffers. By default,
+ * this interval is 50ms, which means that any empty SystemStreamPartition
+ * buffer will be refreshed at least every 50ms. A higher value here means
+ * that empty SystemStreamPartitions will be refreshed less often, which
+ * means more latency is introduced, but less CPU and network will be used.
+ * Decreasing this value means that empty SystemStreamPartitions are
+ * refreshed more frequently, thereby introducing less latency, but
+ * increasing CPU and network utilization.
+ */
+ val POLL_INTERVAL_MS = "task.poll.interval.ms"
+
implicit def Config2Task(config: Config) = new TaskConfig(config)
}
@@ -76,4 +93,6 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) {
def getDropDeserialization = getOption(TaskConfig.DROP_DESERIALIZATION_ERROR)
def getDropSerialization = getOption(TaskConfig.DROP_SERIALIZATION_ERROR)
+
+ def getPollIntervalMs = getOption(TaskConfig.POLL_INTERVAL_MS)
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index bff6000..a7142b2 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -306,23 +306,28 @@ object SamzaContainer extends Logging {
info("Got offset manager: %s" format offsetManager)
- val dropDeserializationError: Boolean = config.getDropDeserialization match {
+ val dropDeserializationError = config.getDropDeserialization match {
case Some(dropError) => dropError.toBoolean
case _ => false
}
- val dropSerializationError: Boolean = config.getDropSerialization match {
+ val dropSerializationError = config.getDropSerialization match {
case Some(dropError) => dropError.toBoolean
case _ => false
}
+ val pollIntervalMs = config
+ .getPollIntervalMs
+ .getOrElse(SystemConsumers.DEFAULT_POLL_INTERVAL_MS.toString)
+ .toInt
+
val consumerMultiplexer = new SystemConsumers(
- // TODO add config values for no new message timeout and max msgs per stream partition
chooser = chooser,
consumers = consumers,
serdeManager = serdeManager,
metrics = systemConsumersMetrics,
- dropDeserializationError = dropDeserializationError)
+ dropDeserializationError = dropDeserializationError,
+ pollIntervalMs = pollIntervalMs)
val producerMultiplexer = new SystemProducers(
producers = producers,
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 9eb70f2..fef7227 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -20,13 +20,21 @@
package org.apache.samza.system
import scala.collection.JavaConversions._
-import scala.collection.mutable.Queue
import org.apache.samza.serializers.SerdeManager
import grizzled.slf4j.Logging
import org.apache.samza.system.chooser.MessageChooser
-import org.apache.samza.util.DoublingBackOff
-import org.apache.samza.system.chooser.BufferingMessageChooser
import org.apache.samza.SamzaException
+import java.util.HashMap
+import java.util.ArrayDeque
+import java.util.Queue
+import java.util.Set
+import java.util.HashSet
+
+object SystemConsumers {
+ val DEFAULT_POLL_INTERVAL_MS = 50
+ val DEFAULT_NO_NEW_MESSAGES_TIMEOUT = 10
+ val DEFAULT_DROP_SERIALIZATION_ERROR = false
+}
/**
* The SystemConsumers class coordinates between all SystemConsumers, the
@@ -59,98 +67,100 @@ class SystemConsumers(
metrics: SystemConsumersMetrics = new SystemConsumersMetrics,
/**
- * The maximum number of messages to poll from a single SystemStreamPartition.
- */
- maxMsgsPerStreamPartition: Int = 10000,
-
- /**
* If MessageChooser returns null when it's polled, SystemConsumers will
* poll each SystemConsumer with a timeout next time it tries to poll for
* messages. Setting the timeout to 0 means that SamzaContainer's main
* thread will sit in a tight loop polling every SystemConsumer over and
* over again if no new messages are available.
*/
- noNewMessagesTimeout: Long = 10,
+ noNewMessagesTimeout: Int = SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
/**
- * This parameter is to define how to deal with deserialization failure. If set to true,
- * the task will skip the messages when deserialization fails. If set to false, the task
- * will throw SamzaException and fail the container.
+ * This parameter is to define how to deal with deserialization failure. If
+ * set to true, the task will skip the messages when deserialization fails.
+ * If set to false, the task will throw SamzaException and fail the container.
*/
- dropDeserializationError: Boolean = false) extends Logging {
+ dropDeserializationError: Boolean = SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
/**
- * The buffer where SystemConsumers stores all incoming message envelopes.
+ * <p>Defines an upper bound for how long the SystemConsumers will wait
+ * before polling systems for more data. The default setting is 50ms, which
+ * means that SystemConsumers will poll for new messages for all
+ * SystemStreamPartitions with empty buffers every 50ms. SystemConsumers
+ * will also poll for new messages any time that there are no available
+ * messages to process, or any time the MessageChooser returns a null
+ * IncomingMessageEnvelope.</p>
+ *
+ * <p>This parameter also implicitly defines how much latency is introduced
+ * by SystemConsumers. If a message is available for a SystemStreamPartition
+ * with no remaining unprocessed messages, the SystemConsumers will poll for
+ * it within 50ms of its availability in the stream system.</p>
*/
- val buffer = new BufferingMessageChooser(chooser)
+ pollIntervalMs: Int = SystemConsumers.DEFAULT_POLL_INTERVAL_MS,
/**
- * A map of every SystemStreamPartition that SystemConsumers is responsible
- * for polling. The values are how many messages to poll for during the next
- * SystemConsumers.poll call.
- *
- * If the value for a SystemStreamPartition is maxMsgsPerStreamPartition,
- * then the implication is that SystemConsumers has no incoming messages in
- * its buffer for the SystemStreamPartition. If the value is 0 then the
- * SystemConsumers' buffer is full for the SystemStreamPartition.
+ * Clock can be used to inject a custom clock when mocking this class in
+ * tests. The default implementation returns the current system clock time.
*/
- var fetchMap = Map[SystemStreamPartition, java.lang.Integer]()
+ clock: () => Long = () => System.currentTimeMillis) extends Logging {
/**
- * A cache of fetchMap values, grouped according to the system. This is
- * purely a trick to get better performance out of the SystemConsumsers
- * class, since the map from systemName to its fetchMap is used for every
- * poll call.
+ * A buffer of incoming messages grouped by SystemStreamPartition. These
+ * messages are handed out to the MessageChooser as it needs them.
*/
- var systemFetchMapCache = Map[String, Map[SystemStreamPartition, java.lang.Integer]]()
+ private val unprocessedMessagesBySSP = new HashMap[SystemStreamPartition, Queue[IncomingMessageEnvelope]]()
+
+ /**
+ * A set of SystemStreamPartitions grouped by systemName. This is used as a
+ * cache to figure out which SystemStreamPartitions we need to poll from the
+ * underlying system consumer.
+ */
+ private val emptySystemStreamPartitionsBySystem = new HashMap[String, Set[SystemStreamPartition]]()
/**
* Default timeout to noNewMessagesTimeout. Every time SystemConsumers
- * receives incoming messages, it sets timout to 0. Every time
+ * receives incoming messages, it sets timeout to 0. Every time
* SystemConsumers receives no new incoming messages from the MessageChooser,
* it sets timeout to noNewMessagesTimeout again.
*/
var timeout = noNewMessagesTimeout
/**
- * Make the maximum backoff proportional to the number of streams we're consuming.
- * For a few streams, make the max back off 1, but for hundreds make it up to 1k,
- * which experimentally has shown to be the most performant.
+ * The last time that systems were polled for new messages.
*/
- var maxBackOff = 0
+ var lastPollMs = 0L
/**
- * How low totalUnprocessedMessages has to get before the consumers are
- * polled for more data. This is defined to be 10% of
- * maxMsgsPerStreamPartition. Since maxMsgsPerStreamPartition defaults to
- * 10000, the default refreshThreshold is 1000.
+ * Total number of unprocessed messages in unprocessedMessagesBySSP.
*/
- val refreshThreshold = maxMsgsPerStreamPartition * .1
+ var totalUnprocessedMessages = 0
debug("Got stream consumers: %s" format consumers)
- debug("Got max messages per stream: %s" format maxMsgsPerStreamPartition)
debug("Got no new message timeout: %s" format noNewMessagesTimeout)
- metrics.setUnprocessedMessages(() => buffer.unprocessedMessages.size)
- metrics.setNeededByChooser(() => buffer.neededByChooser.size)
metrics.setTimeout(() => timeout)
- metrics.setMaxMessagesPerStreamPartition(() => maxMsgsPerStreamPartition)
- metrics.setNoNewMessagesTimeout(() => noNewMessagesTimeout)
+ metrics.setNeededByChooser(() => emptySystemStreamPartitionsBySystem.size)
+ metrics.setUnprocessedMessages(() => totalUnprocessedMessages)
def start {
debug("Starting consumers.")
- maxBackOff = scala.math.pow(10, scala.math.log10(fetchMap.size).toInt).toInt
-
- debug("Got maxBackOff: " + maxBackOff)
+ emptySystemStreamPartitionsBySystem ++= unprocessedMessagesBySSP
+ .keySet
+ .groupBy(_.getSystem)
+ .mapValues(systemStreamPartitions => new HashSet(systemStreamPartitions.toSeq))
consumers
.keySet
.foreach(metrics.registerSystem)
- consumers.values.foreach(_.start)
+ consumers
+ .values
+ .foreach(_.start)
+
+ chooser.start
- buffer.start
+ refresh
}
def stop {
@@ -158,15 +168,14 @@ class SystemConsumers(
consumers.values.foreach(_.stop)
- buffer.stop
+ chooser.stop
}
def register(systemStreamPartition: SystemStreamPartition, offset: String) {
debug("Registering stream: %s, %s" format (systemStreamPartition, offset))
-
metrics.registerSystemStream(systemStreamPartition.getSystemStream)
- buffer.register(systemStreamPartition, offset)
- updateFetchMap(systemStreamPartition, maxMsgsPerStreamPartition)
+ unprocessedMessagesBySSP.put(systemStreamPartition, new ArrayDeque[IncomingMessageEnvelope]())
+ chooser.register(systemStreamPartition, offset)
try {
consumers(systemStreamPartition.getSystem).register(systemStreamPartition, offset)
@@ -175,135 +184,128 @@ class SystemConsumers(
}
}
- /**
- * Needs to be be lazy so that we are sure to get the value of maxBackOff assigned
- * in start(), rather than its initial value.
- */
- lazy val refresh = new DoublingBackOff(maxBackOff) {
- def call(): Boolean = {
- debug("Refreshing chooser with new messages.")
-
- // Poll every system for new messages.
- consumers.keys.map(poll(_)).contains(true)
- }
- }
-
def choose: IncomingMessageEnvelope = {
- val envelopeFromChooser = buffer.choose
+ val envelopeFromChooser = chooser.choose
if (envelopeFromChooser == null) {
debug("Chooser returned null.")
metrics.choseNull.inc
- // Allow blocking if the chooser didn't choose a message.
+ // Sleep for a while so we don't poll in a tight loop.
timeout = noNewMessagesTimeout
} else {
- debug("Chooser returned an incoming message envelope: %s" format envelopeFromChooser)
+ val systemStreamPartition = envelopeFromChooser.getSystemStreamPartition
- metrics.choseObject.inc
+ debug("Chooser returned an incoming message envelope: %s" format envelopeFromChooser)
- // Don't block if we have a message to process.
+ // Ok to give the chooser a new message from this stream.
timeout = 0
-
+ metrics.choseObject.inc
metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition.getSystemStream).inc
+
+ if (!update(systemStreamPartition)) {
+ emptySystemStreamPartitionsBySystem.get(systemStreamPartition.getSystem).add(systemStreamPartition)
+ }
}
- // Always refresh if we got nothing from the chooser. Otherwise, just
- // refresh when the buffer is low.
- if (envelopeFromChooser == null || buffer.totalUnprocessedMessages <= refreshThreshold) {
- refresh.maybeCall()
+ if (envelopeFromChooser == null || lastPollMs < clock() - pollIntervalMs) {
+ refresh
}
- updateMessageChooser
envelopeFromChooser
}
/**
- * Poll a system for new messages from SystemStreamPartitions that have
- * dipped below the depletedQueueSizeThreshold threshold. Return true if
- * any envelopes were found, false if none.
+ * Poll all SystemStreamPartitions for which there are currently no new
+ * messages to process.
*/
- private def poll(systemName: String): Boolean = {
+ private def poll(systemName: String) {
debug("Polling system consumer: %s" format systemName)
metrics.systemPolls(systemName).inc
- val consumer = consumers(systemName)
-
debug("Getting fetch map for system: %s" format systemName)
- val systemFetchMap = systemFetchMapCache(systemName)
+ val systemFetchSet = emptySystemStreamPartitionsBySystem.get(systemName)
- debug("Fetching: %s" format systemFetchMap)
+ // Poll when at least one SSP in this system needs more messages.
+ if (systemFetchSet.size > 0) {
+ val consumer = consumers(systemName)
- metrics.systemStreamPartitionFetchesPerPoll(systemName).inc(systemFetchMap.size)
+ debug("Fetching: %s" format systemFetchSet)
- val incomingEnvelopes = consumer.poll(systemFetchMap, timeout)
+ metrics.systemStreamPartitionFetchesPerPoll(systemName).inc(systemFetchSet.size)
- debug("Got incoming message envelopes: %s" format incomingEnvelopes)
+ val systemStreamPartitionEnvelopes = consumer.poll(systemFetchSet, timeout)
- metrics.systemMessagesPerPoll(systemName).inc
+ debug("Got incoming message envelopes: %s" format systemStreamPartitionEnvelopes)
- // We have new un-processed envelopes, so update maps accordingly.
- incomingEnvelopes.foreach(envelope => {
- val systemStreamPartition = envelope.getSystemStreamPartition
+ metrics.systemMessagesPerPoll(systemName).inc
- val messageEnvelope = try {
- Some(serdeManager.fromBytes(envelope))
- } catch {
- case e: Exception if !dropDeserializationError => throw new SystemConsumersException("can not deserialize the message", e)
- case ex: Throwable => {
- debug("Deserialization fails: %s . Drop the error message" format ex)
- metrics.deserializationError.inc
- None
- }
- }
+ val sspAndEnvelopeIterator = systemStreamPartitionEnvelopes.entrySet.iterator
- if (!messageEnvelope.isEmpty) {
- buffer.update(messageEnvelope.get)
- }
+ while (sspAndEnvelopeIterator.hasNext) {
+ val sspAndEnvelope = sspAndEnvelopeIterator.next
+ val systemStreamPartition = sspAndEnvelope.getKey
+ val envelopes = new ArrayDeque(sspAndEnvelope.getValue)
+ val numEnvelopes = envelopes.size
+ totalUnprocessedMessages += numEnvelopes
- debug("Got message for: %s, %s" format (systemStreamPartition, envelope))
+ if (numEnvelopes > 0) {
+ unprocessedMessagesBySSP.put(systemStreamPartition, envelopes)
- updateFetchMap(systemStreamPartition, -1)
+ // Update the chooser if it needs a message for this SSP.
+ if (emptySystemStreamPartitionsBySystem.get(systemStreamPartition.getSystem).remove(systemStreamPartition)) {
+ update(systemStreamPartition)
+ }
+ }
+ }
+ } else {
+ debug("Skipping polling for %s. Already have messages available for all registered SystemStreamPartitions." format (systemName))
+ }
+ }
- debug("Updated fetch map for: %s, %s" format (systemStreamPartition, fetchMap))
- })
+ private def refresh {
+ debug("Refreshing chooser with new messages.")
- !incomingEnvelopes.isEmpty
+ // Update last poll time so we don't poll too frequently.
+ lastPollMs = clock()
+
+ // Poll every system for new messages.
+ consumers.keys.map(poll(_))
}
/**
- * A helper method that updates both fetchMap and systemFetchMapCache
- * simultaneously. This is a convenience method to make sure that the
- * systemFetchMapCache stays in sync with fetchMap.
+ * Tries to update the message chooser with an envelope from the supplied
+ * SystemStreamPartition if an envelope is available.
*/
- private def updateFetchMap(systemStreamPartition: SystemStreamPartition, amount: Int = 1) {
- val fetchSize = fetchMap.getOrElse(systemStreamPartition, java.lang.Integer.valueOf(0)).intValue + amount
- val systemName = systemStreamPartition.getSystem
- var systemFetchMap = systemFetchMapCache.getOrElse(systemName, Map())
+ private def update(systemStreamPartition: SystemStreamPartition) = {
+ var updated = false
+ val q = unprocessedMessagesBySSP.get(systemStreamPartition)
+
+ while (q.size > 0 && !updated) {
+ val rawEnvelope = q.remove
+ val deserializedEnvelope = try {
+ Some(serdeManager.fromBytes(rawEnvelope))
+ } catch {
+ case e: Exception if !dropDeserializationError =>
+ throw new SystemConsumersException("Cannot deserialize an incoming message.", e)
+ case ex: Exception =>
+ debug("Cannot deserialize an incoming message. Dropping the error message.", ex)
+ metrics.deserializationError.inc
+ None
+ }
- if (fetchSize >= refreshThreshold) {
- systemFetchMap += systemStreamPartition -> fetchSize
- } else {
- systemFetchMap -= systemStreamPartition
- }
+ if (deserializedEnvelope.isDefined) {
+ chooser.update(deserializedEnvelope.get)
+ updated = true
+ }
- fetchMap += systemStreamPartition -> fetchSize
- systemFetchMapCache += systemName -> systemFetchMap
- }
+ totalUnprocessedMessages -= 1
+ }
- /**
- * A helper method that updates MessageChooser. This should be called in
- * "choose" method after we try to consume a message from MessageChooser.
- */
- private def updateMessageChooser {
- buffer
- .flush
- // Let the fetchMap know of any SSPs that were given to the chooser, so
- // a new fetch can be triggered if the buffer is low.
- .foreach(updateFetchMap(_))
+ updated
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
index b065ae6..a63349c 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
@@ -33,10 +33,6 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistry
val systemMessagesPerPoll = scala.collection.mutable.Map[String, Counter]()
val systemStreamMessagesChosen = scala.collection.mutable.Map[SystemStream, Counter]()
- def setUnprocessedMessages(getValue: () => Int) {
- newGauge("unprocessed-messages", getValue)
- }
-
def setNeededByChooser(getValue: () => Int) {
newGauge("ssps-needed-by-chooser", getValue)
}
@@ -45,12 +41,8 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistry
newGauge("poll-timeout", getValue)
}
- def setMaxMessagesPerStreamPartition(getValue: () => Int) {
- newGauge("max-buffered-messages-per-stream-partition", getValue)
- }
-
- def setNoNewMessagesTimeout(getValue: () => Long) {
- newGauge("blocking-poll-timeout", getValue)
+ def setUnprocessedMessages(getValue: () => Int) {
+ newGauge("unprocessed-messages", getValue)
}
def registerSystem(systemName: String) {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala
deleted file mode 100644
index c7ef6ef..0000000
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.chooser
-
-import scala.collection.mutable.Queue
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.IncomingMessageEnvelope
-
-/**
- * This buffer is responsible for storing new unprocessed
- * IncomingMessageEnvelopes, feeding the envelopes to the message chooser,
- * and coordinating with the chooser to pick the next message to be processed.
- */
-class BufferingMessageChooser(chooser: MessageChooser) extends MessageChooser {
- import ChooserStatus._
-
- /**
- * A buffer of incoming messages grouped by SystemStreamPartition.
- */
- var unprocessedMessages = Map[SystemStreamPartition, Queue[IncomingMessageEnvelope]]()
-
- /**
- * A count of all messages sitting in SystemConsumers' unprocessedMessages
- * buffer, and inside the MessageChooser.
- */
- var totalUnprocessedMessages = 0
-
- /**
- * A map that contains the current status for each SSP that's been
- * registered to the coordinator.
- */
- var statuses = Map[SystemStreamPartition, ChooserStatus]()
-
- /**
- * This is a cache of all SSPs that are currently in the "NeededByChooser"
- * state. It's simply here to improve performance, since it means we don't
- * need to iterate over all SSPs in the statuses map in order to determine
- * which SSPs are currently needed by the chooser.
- */
- var neededByChooser = Set[SystemStreamPartition]()
-
- /**
- * Start the chooser that this buffer is managing.
- */
- def start = chooser.start
-
- /**
- * Stop the chooser that this buffer is managing.
- */
- def stop = chooser.stop
-
- /**
- * Register a new SystemStreamPartition with this buffer, as well as the
- * underlying chooser.
- */
- def register(systemStreamPartition: SystemStreamPartition, offset: String) {
- unprocessedMessages += systemStreamPartition -> Queue[IncomingMessageEnvelope]()
- chooser.register(systemStreamPartition, offset)
- setStatus(systemStreamPartition, NeededByChooser)
- }
-
- /**
- * Add a new unprocessed IncomingMessageEnvelope to the buffer. The buffer
- * will hold on to this envelope, and eventually feed it to the underlying chooser.
- */
- def update(envelope: IncomingMessageEnvelope) {
- val systemStreamPartition = envelope.getSystemStreamPartition
-
- unprocessedMessages(envelope.getSystemStreamPartition).enqueue(envelope)
- totalUnprocessedMessages += 1
-
- if (statuses(systemStreamPartition).equals(SkippingChooser)) {
- // If we were skipping messages from this SystemStreamPartition, update
- // neededByChooser since we've got messages for it now.
- setStatus(systemStreamPartition, NeededByChooser)
- }
- }
-
- /**
- * Tell the buffer to update the underlying chooser with any SSPs that the
- * chooser needs.
- *
- * @return A set of SystemStreamPartitions that were updated in the chooser
- * as a result of this method invocation.
- */
- def flush: Set[SystemStreamPartition] = {
- var updatedSystemStreamPartitions = Set[SystemStreamPartition]()
-
- neededByChooser.foreach(systemStreamPartition => {
- if (unprocessedMessages(systemStreamPartition).size > 0) {
- // If we have messages for a stream that the chooser needs, then update.
- chooser.update(unprocessedMessages(systemStreamPartition).dequeue)
- updatedSystemStreamPartitions += systemStreamPartition
- setStatus(systemStreamPartition, InChooser)
- } else {
- // If we find that we have no messages for this SystemStreamPartition,
- // rather than continue trying to update the chooser with this
- // SystemStreamPartition, add it to the skip set and remove it from
- // the neededByChooser set (see below).
- setStatus(systemStreamPartition, SkippingChooser)
- }
- })
-
- updatedSystemStreamPartitions
- }
-
- /**
- * Choose a message from the underlying chooser, and return it.
- *
- * @return The IncomingMessageEnvelope that the chooser has picked, or null
- * if the chooser didn't pick anything.
- */
- def choose = {
- val envelope = chooser.choose
-
- if (envelope != null) {
- setStatus(envelope.getSystemStreamPartition, NeededByChooser)
-
- // Chooser picked a message, so we've got one less unprocessed message.
- totalUnprocessedMessages -= 1
- }
-
- envelope
- }
-
- /**
- * Update the status of a SystemStreamPartition.
- */
- private def setStatus(systemStreamPartition: SystemStreamPartition, status: ChooserStatus) {
- statuses += systemStreamPartition -> status
-
- if (status.equals(NeededByChooser)) {
- neededByChooser += systemStreamPartition
- } else {
- neededByChooser -= systemStreamPartition
- }
- }
-}
-
-/**
- * ChooserStatus denotes the current state of a SystemStreamPartition for a
- * MessageChooser. This state is used to improve performance in the buffer.
- * To update a MessageChooser, we first check if an envelope exists for each
- * SSP that the buffer needs. If the buffer contains a lot of empty queues,
- * then the operation of iterating over all needed SSPs, and discovering that
- * their queues are empty is a waste of time, since they'll remain empty until
- * a new envelope is added to the queue, which the buffer knows by virtue of
- * having access to the enqueue method. Instead, we stop checking for an empty
- * SSP (SkippingChooser), until a new envelope is added via the enqueue method.
- */
-object ChooserStatus extends Enumeration {
- type ChooserStatus = Value
-
- /**
- * When an envelope has been updated for the MessageChooser, the
- * SystemStreamPartition for the envelope should be set to the InChooser
- * state. The state will remain this way until the MessageChooser returns an
- * envelope with the same SystemStreamPartition, at which point, the
- * SystemStreamPartition's state should be transitioned to NeededByChooser
- * (see below).
- */
- val InChooser = Value
-
- /**
- * If a SystemStreamPartition is not in the InChooser state, and it's
- * unclear if the buffer has more messages available for the SSP, the SSP
- * should be in the NeededByChooser state. This state means that the chooser
- * should be updated with a new message from the SSP, if one is available.
- */
- val NeededByChooser = Value
-
- /**
- * When a SystemStreamPartition is in the NeededByChooser state, and we try
- * to update the message chooser with a new envelope from the buffer for the
- * SSP, there are two potential outcomes. One is that there is an envelope
- * in the buffer for the SSP. In this case, the state will be transitioned
- * to InChooser. The other possibility is that there is no envelope in the
- * buffer at the time the update is trying to occur. In the latter case, the
- * SSP is transitioned to the SkippingChooser state. This state means that
- * the buffer will cease to update the chooser with envelopes from this SSP
- * until a new envelope for the SSP is added to the buffer again.
- *
- * <br/><br/>
- *
- * The reason that this state exists is purely to improve performance. If we
- * simply leave SSPs in the NeededByChooser state, we will try and update the
- * chooser on every updateChooser call for all SSPs. This is a waste of time
- * if the buffer contains a large number of empty SSPs.
- */
- val SkippingChooser = Value
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala
index 5374121..4ecf1f2 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala
@@ -26,7 +26,6 @@ import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.metrics.ReadableMetricsRegistry
-
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.metrics.MetricsHelper
@@ -42,13 +41,6 @@ import org.apache.samza.metrics.MetricsHelper
class RoundRobinChooser(metrics: RoundRobinChooserMetrics = new RoundRobinChooserMetrics) extends BaseMessageChooser {
/**
- * SystemStreamPartitions that the chooser has received a message for, but
- * have not yet returned. Envelopes for these SystemStreamPartitions should
- * be in the queue.
- */
- var inflightSystemStreamPartitions = Set[SystemStreamPartition]()
-
- /**
* Queue of potential messages to process. Round robin will always choose
* the message at the head of the queue. A queue can be used to implement
* round robin here because we only get one envelope per
@@ -61,29 +53,12 @@ class RoundRobinChooser(metrics: RoundRobinChooserMetrics = new RoundRobinChoose
}
def update(envelope: IncomingMessageEnvelope) = {
- if (inflightSystemStreamPartitions.contains(envelope.getSystemStreamPartition)) {
- throw new SamzaException("Received more than one envelope from the same "
- + "SystemStreamPartition without returning the last. This is a "
- + "violation of the contract with SystemConsumers, and breaks this "
- + "RoundRobin implementation.")
- }
-
q.add(envelope)
- inflightSystemStreamPartitions += envelope.getSystemStreamPartition
}
- def choose = {
- val envelope = q.poll
-
- if (envelope != null) {
- inflightSystemStreamPartitions -= envelope.getSystemStreamPartition
- }
-
- envelope
- }
+ def choose = q.poll
}
-
class RoundRobinChooserMetrics(val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
def setBufferedMessages(getValue: () => Int) {
newGauge("buffered-messages", getValue)
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/main/scala/org/apache/samza/util/DoublingBackOff.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/DoublingBackOff.scala b/samza-core/src/main/scala/org/apache/samza/util/DoublingBackOff.scala
deleted file mode 100644
index e1d6d4c..0000000
--- a/samza-core/src/main/scala/org/apache/samza/util/DoublingBackOff.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.util
-
-/**
- * Perform the provided action (via the call method) and, if it returns true,
- * perform it again, the next time. If, however, call returns false, do not
- * perform call the next time, instead wait 2*n calls before actually calling,
- * with n increasing to the maximum specified in the constructor.
-
- * @param maxBackOff Absolute maximum number of calls to call before actually performing call.
- */
-abstract class DoublingBackOff(maxBackOff:Int = 64) {
- var invocationsBeforeCall = 0
- var currentBackOff = 0
-
- /**
- * Method to invoke and whose return value will determine the next time
- * it is called again.
- */
- def call():Boolean
-
- /**
- * Possibly execute the call method, based on the result of the previous run.
- */
- def maybeCall():Unit = {
- if(invocationsBeforeCall == 0) {
- if (call()) {
- // call succeeded so reset backoff
- currentBackOff = 0
- } else {
- // Call failed, so start backing off
- currentBackOff = scala.math.min(maxBackOff, nextBackOff(currentBackOff))
- invocationsBeforeCall = currentBackOff
- }
- } else {
- invocationsBeforeCall -= 1
- }
-
- }
-
- // 2 * 0 == 0, making getting started a wee bit hard, so we need a little help with that first back off
- private def nextBackOff(i:Int) = if(i == 0) 1 else 2 * i
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index 97e65eb..04229a6 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -27,39 +27,153 @@ import org.apache.samza.system.chooser.MessageChooser
import org.apache.samza.system.chooser.DefaultChooser
import org.apache.samza.util.BlockingEnvelopeMap
import org.apache.samza.serializers._
+import org.apache.samza.system.chooser.MockMessageChooser
class TestSystemConsumers {
+ def testPollIntervalMs {
+ val numEnvelopes = 1000
+ val system = "test-system"
+ val systemStreamPartition0 = new SystemStreamPartition(system, "some-stream", new Partition(0))
+ val systemStreamPartition1 = new SystemStreamPartition(system, "some-stream", new Partition(1))
+ val envelope = new IncomingMessageEnvelope(systemStreamPartition0, "1", "k", "v")
+ val consumer = new CustomPollResponseSystemConsumer(envelope)
+ var now = 0L
+ val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), clock = () => now)
+
+ consumers.register(systemStreamPartition0, "0")
+ consumers.register(systemStreamPartition1, "1234")
+ consumers.start
+
+ // Tell the consumer to respond with 1000 messages for SSP0, and no
+ // messages for SSP1.
+ consumer.setResponseSizes(numEnvelopes)
+
+ // Choose to trigger a refresh with data.
+ assertNull(consumers.choose)
+ // 2: First on start, second on choose.
+ assertEquals(2, consumer.polls)
+ assertEquals(2, consumer.lastPoll.size)
+ assertTrue(consumer.lastPoll.contains(systemStreamPartition0))
+ assertTrue(consumer.lastPoll.contains(systemStreamPartition1))
+ assertEquals(envelope, consumers.choose)
+ assertEquals(envelope, consumers.choose)
+ // We aren't polling because we're getting non-null envelopes.
+ assertEquals(2, consumer.polls)
+
+ // Advance the clock to trigger a new poll even though there are still
+ // messages.
+ now = SystemConsumers.DEFAULT_POLL_INTERVAL_MS
+
+ assertEquals(envelope, consumers.choose)
+
+ // We polled even though there are still 997 messages in the unprocessed
+ // message buffer.
+ assertEquals(3, consumer.polls)
+ assertEquals(1, consumer.lastPoll.size)
+
+ // Only SSP1 was polled because we still have messages for SSP2.
+ assertTrue(consumer.lastPoll.contains(systemStreamPartition1))
+
+ // Now drain all messages for SSP0. There should be exactly 997 messages,
+ // since we have chosen 3 already, and we started with 1000.
+ (0 until (numEnvelopes - 3)).foreach { i =>
+ assertEquals(envelope, consumers.choose)
+ }
+
+ // Nothing left. Should trigger a poll here.
+ assertNull(consumers.choose)
+ assertEquals(4, consumer.polls)
+ assertEquals(2, consumer.lastPoll.size)
+
+ // Now we ask for messages from both again.
+ assertTrue(consumer.lastPoll.contains(systemStreamPartition0))
+ assertTrue(consumer.lastPoll.contains(systemStreamPartition1))
+ }
+
+ def testBasicSystemConsumersFunctionality {
+ val system = "test-system"
+ val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1))
+ val envelope = new IncomingMessageEnvelope(systemStreamPartition, "1", "k", "v")
+ val consumer = new CustomPollResponseSystemConsumer(envelope)
+ var now = 0
+ val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), clock = () => now)
+
+ consumers.register(systemStreamPartition, "0")
+ consumers.start
+
+ // Start should trigger a poll to the consumer.
+ assertEquals(1, consumer.polls)
+
+ // Tell the consumer to start returning messages when polled.
+ consumer.setResponseSizes(1)
+
+ // Choose to trigger a refresh with data.
+ assertNull(consumers.choose)
+
+ // Choose should have triggered a second poll, since no messages are available.
+ assertEquals(2, consumer.polls)
+
+ // Choose a few times. This time there is no data.
+ assertEquals(envelope, consumers.choose)
+ assertNull(consumers.choose)
+ assertNull(consumers.choose)
+
+ // Return more than one message this time.
+ consumer.setResponseSizes(2)
+
+ // Choose to trigger a refresh with data.
+ assertNull(consumers.choose)
+
+ // Increase clock interval.
+ now = SystemConsumers.DEFAULT_POLL_INTERVAL_MS
+
+ // We get two messages now.
+ assertEquals(envelope, consumers.choose)
+ // Should not poll even though clock interval increases past interval threshold.
+ assertEquals(2, consumer.polls)
+ assertEquals(envelope, consumers.choose)
+ assertNull(consumers.choose)
+ }
+
@Test
def testSystemConumersShouldRegisterStartAndStopChooser {
val system = "test-system"
val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1))
- var started = 0
- var stopped = 0
- var registered = Map[SystemStreamPartition, String]()
+ var consumerStarted = 0
+ var consumerStopped = 0
+ var consumerRegistered = Map[SystemStreamPartition, String]()
+ var chooserStarted = 0
+ var chooserStopped = 0
+ var chooserRegistered = Map[SystemStreamPartition, String]()
val consumer = Map(system -> new SystemConsumer {
- def start {}
- def stop {}
- def register(systemStreamPartition: SystemStreamPartition, offset: String) {}
- def poll(systemStreamPartitions: java.util.Map[SystemStreamPartition, java.lang.Integer], timeout: Long) = List()
+ def start = consumerStarted += 1
+ def stop = consumerStopped += 1
+ def register(systemStreamPartition: SystemStreamPartition, offset: String) = consumerRegistered += systemStreamPartition -> offset
+ def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]()
})
val consumers = new SystemConsumers(new MessageChooser {
def update(envelope: IncomingMessageEnvelope) = Unit
def choose = null
- def start = started += 1
- def stop = stopped += 1
- def register(systemStreamPartition: SystemStreamPartition, offset: String) = registered += systemStreamPartition -> offset
+ def start = chooserStarted += 1
+ def stop = chooserStopped += 1
+ def register(systemStreamPartition: SystemStreamPartition, offset: String) = chooserRegistered += systemStreamPartition -> offset
}, consumer, null)
consumers.register(systemStreamPartition, "0")
consumers.start
consumers.stop
- assertEquals(1, started)
- assertEquals(1, stopped)
- assertEquals(1, registered.size)
- assertEquals("0", registered(systemStreamPartition))
+ assertEquals(1, chooserStarted)
+ assertEquals(1, chooserStopped)
+ assertEquals(1, chooserRegistered.size)
+ assertEquals("0", chooserRegistered(systemStreamPartition))
+
+ assertEquals(1, consumerStarted)
+ assertEquals(1, consumerStopped)
+ assertEquals(1, consumerRegistered.size)
+ assertEquals("0", consumerRegistered(systemStreamPartition))
}
@Test
@@ -76,7 +190,7 @@ class TestSystemConsumers {
def start {}
def stop {}
def register(systemStreamPartition: SystemStreamPartition, offset: String) {}
- def poll(systemStreamPartitions: java.util.Map[SystemStreamPartition, java.lang.Integer], timeout: Long) = List()
+ def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]()
})
val consumers = new SystemConsumers(new MessageChooser {
def update(envelope: IncomingMessageEnvelope) = Unit
@@ -102,16 +216,16 @@ class TestSystemConsumers {
val system = "test-system"
val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1))
val msgChooser = new DefaultChooser
- val consumer = Map(system -> new SimpleConsumer)
+ val consumer = Map(system -> new SerializingConsumer)
val systemMessageSerdes = Map(system -> (new StringSerde("UTF-8")).asInstanceOf[Serde[Object]]);
val serdeManager = new SerdeManager(systemMessageSerdes = systemMessageSerdes)
- // it should throw exceptions when the deserialization has error
+ // throw exceptions when the deserialization has error
val consumers = new SystemConsumers(msgChooser, consumer, serdeManager, dropDeserializationError = false)
consumers.register(systemStreamPartition, "0")
- consumers.start
consumer(system).putBytesMessage
consumer(system).putStringMessage
+ consumers.start
var caughtRightException = false
try {
@@ -142,10 +256,35 @@ class TestSystemConsumers {
}
/**
- * a simple consumer that provides two extra methods, one is to put bytes format message
- * and the other to put string format message
+ * A simple MockSystemConsumer that keeps track of what was polled, and lets
+ * you define how many envelopes to return in the poll response. You can
+ * supply the envelope to use for poll responses through the constructor.
+ */
+ private class CustomPollResponseSystemConsumer(envelope: IncomingMessageEnvelope) extends SystemConsumer {
+ var polls = 0
+ var pollResponse = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]()
+ var lastPoll: java.util.Set[SystemStreamPartition] = null
+ def start {}
+ def stop {}
+ def register(systemStreamPartition: SystemStreamPartition, offset: String) {}
+ def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = {
+ polls += 1
+ lastPoll = systemStreamPartitions
+ pollResponse
+ }
+ def setResponseSizes(numEnvelopes: Int) {
+ val q = new java.util.ArrayList[IncomingMessageEnvelope]()
+ (0 until numEnvelopes).foreach { i => q.add(envelope) }
+ pollResponse = Map(envelope.getSystemStreamPartition -> q)
+ pollResponse = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]()
+ }
+ }
+
+ /**
+ * A simple consumer that provides two extra methods: one is to put bytes
+ * format message and the other to put string format message.
*/
- private class SimpleConsumer extends BlockingEnvelopeMap {
+ private class SerializingConsumer extends BlockingEnvelopeMap {
val systemStreamPartition = new SystemStreamPartition("test-system", "some-stream", new Partition(1))
def putBytesMessage {
put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, "0", "0", "test".getBytes()))
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala
deleted file mode 100644
index c96c53b..0000000
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.chooser
-
-import org.apache.samza.system.chooser.ChooserStatus._
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.Partition
-import org.junit.Assert._
-import org.junit.Test
-
-class TestBufferingMessageChooser {
- @Test
- def testShouldBufferAndFlush {
- val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
- val chooser = new MockMessageChooser
- val buffer = new BufferingMessageChooser(chooser)
- val envelope1 = new IncomingMessageEnvelope(ssp1, "1", null, null)
- buffer.register(ssp1, "1")
- assertEquals(1, chooser.registers.size)
- assertEquals("1", chooser.registers.getOrElse(ssp1, "0"))
- buffer.start
- assertEquals(1, chooser.starts)
- assertEquals(null, buffer.choose)
- buffer.update(envelope1)
- // Should buffer this update, rather than passing it to the wrapped chooser.
- assertEquals(null, buffer.choose)
- buffer.flush
- assertEquals(envelope1, buffer.choose)
- assertEquals(null, buffer.choose)
- buffer.stop
- assertEquals(1, chooser.stops)
- }
-
- @Test
- def testBufferShouldSkipCheckedSSPs {
- val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
- val chooser = new MockMessageChooser
- val buffer = new BufferingMessageChooser(chooser)
- val envelope1 = new IncomingMessageEnvelope(ssp1, "1", null, null)
- buffer.register(ssp1, "1")
- buffer.start
- checkChooserStatus(NeededByChooser, ssp1, buffer)
-
- // Buffer first message. Still needed.
- buffer.update(envelope1)
- checkChooserStatus(NeededByChooser, ssp1, buffer)
- assertEquals(null, buffer.choose)
- checkChooserStatus(NeededByChooser, ssp1, buffer)
-
- // Flush first message. Now in chooser.
- buffer.flush
- checkChooserStatus(InChooser, ssp1, buffer)
- assertEquals(envelope1, buffer.choose)
- checkChooserStatus(NeededByChooser, ssp1, buffer)
-
- // Now flush with no messages. Should begin skipping chooser since no
- // messages are available.
- assertEquals(null, buffer.choose)
- checkChooserStatus(NeededByChooser, ssp1, buffer)
- buffer.flush
- checkChooserStatus(SkippingChooser, ssp1, buffer)
-
- // Now check that we can get back to NeededByChooser when a new message
- // arrives.
- buffer.update(envelope1)
- checkChooserStatus(NeededByChooser, ssp1, buffer)
- assertEquals(0, chooser.envelopes.size)
- assertEquals(null, buffer.choose)
-
- // And check that we can get back into the InChooser state.
- buffer.flush
- checkChooserStatus(InChooser, ssp1, buffer)
- assertEquals(envelope1, buffer.choose)
- checkChooserStatus(NeededByChooser, ssp1, buffer)
-
- // Shutdown.
- buffer.stop
- assertEquals(1, chooser.stops)
- }
-
- private def checkChooserStatus(status: ChooserStatus, systemStreamPartition: SystemStreamPartition, buffer: BufferingMessageChooser) {
- if (status.equals(NeededByChooser)) {
- assertEquals(Set(systemStreamPartition), buffer.neededByChooser)
- } else {
- assertTrue(!buffer.neededByChooser.contains(systemStreamPartition))
- }
-
- assertEquals(status, buffer.statuses.getOrElse(systemStreamPartition, null))
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
index b2e04a7..d31c3ce 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
@@ -88,39 +88,42 @@ class TestFileReaderSystemConsumer {
consumer.start
Thread.sleep(500)
- val number: Integer = 1000
- val ssp1Number: java.util.Map[SystemStreamPartition, Integer] = HashMap(ssp1 -> number)
- val ssp2Number: java.util.Map[SystemStreamPartition, Integer] = HashMap(ssp2 -> number)
- val ssp3Number: java.util.Map[SystemStreamPartition, Integer] = HashMap(ssp3 -> number)
- val ssp4Number: java.util.Map[SystemStreamPartition, Integer] = HashMap(ssp4 -> number)
- val ssp5Number: java.util.Map[SystemStreamPartition, Integer] = HashMap(ssp5 -> number)
-
- val ssp1Result = consumer.poll(ssp1Number, 1000)
- val ssp2Result = consumer.poll(ssp2Number, 1000)
- val ssp3Result = consumer.poll(ssp3Number, 1000)
- val ssp4Result = consumer.poll(ssp4Number, 1000)
+ val ssp1Result = consumer.poll(Set(ssp1), 1000)
+ val ssp2Result = consumer.poll(Set(ssp2), 1000)
+ val ssp3Result = consumer.poll(Set(ssp3), 1000)
+ val ssp4Result = consumer.poll(Set(ssp4), 1000)
assertEquals(0, ssp1Result.size)
assertEquals(0, ssp2Result.size)
assertEquals(1, ssp3Result.size)
- assertEquals("first line ", ssp3Result(0).getMessage)
- assertEquals("0", ssp3Result(0).getOffset)
+ assertEquals(1, ssp3Result.get(ssp3).size)
+ var envelope = ssp3Result.get(ssp3).remove(0)
+ assertEquals("first line ", envelope.getMessage)
+ assertEquals("0", envelope.getOffset)
assertEquals(1, ssp4Result.size)
- assertEquals("second line ", ssp4Result(0).getMessage)
- assertEquals("12", ssp4Result(0).getOffset)
+ assertEquals(1, ssp4Result.get(ssp4).size)
+ envelope = ssp4Result.get(ssp4).remove(0)
+ assertEquals("second line ", envelope.getMessage)
+ assertEquals("12", envelope.getOffset)
appendFile
Thread.sleep(1000)
// ssp5 should read the new lines
- val ssp5Result = consumer.poll(ssp5Number, 1000)
- assertEquals(3, ssp5Result.size)
- assertEquals("This is a new line", ssp5Result(2).getMessage)
- assertEquals("50", ssp5Result(2).getOffset)
- assertEquals("other lines ", ssp5Result(1).getMessage)
- assertEquals("37", ssp5Result(1).getOffset)
+ val ssp5Result = consumer.poll(Set(ssp5), 1000)
+ assertEquals(1, ssp5Result.size)
+ assertEquals(3, ssp5Result.get(ssp5).size)
+ envelope = ssp5Result.get(ssp5).remove(0)
+ assertEquals("third line ", envelope.getMessage)
+ assertEquals("25", envelope.getOffset)
+ envelope = ssp5Result.get(ssp5).remove(0)
+ assertEquals("other lines ", envelope.getMessage)
+ assertEquals("37", envelope.getOffset)
+ envelope = ssp5Result.get(ssp5).remove(0)
+ assertEquals("This is a new line", envelope.getMessage)
+ assertEquals("50", envelope.getOffset)
consumer.stop
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-core/src/test/scala/org/apache/samza/util/TestDoublingBackOff.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestDoublingBackOff.scala b/samza-core/src/test/scala/org/apache/samza/util/TestDoublingBackOff.scala
deleted file mode 100644
index eaeb005..0000000
--- a/samza-core/src/test/scala/org/apache/samza/util/TestDoublingBackOff.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.util
-
-import org.junit.Test
-import org.junit.Assert._
-
-class TestDoublingBackOff {
- @Test def zeroBackOffWorks() {
- var counter = 0
- val zeroBackOff = new DoublingBackOff(0) {
- def call(): Boolean = {
- counter += 1
- true
- }
- }
-
- for(i <- 0 to 1000) {
- assertEquals(i, counter)
- zeroBackOff.maybeCall()
- }
- }
-
- @Test def backOffWorks() {
- val toReturn = List(true, false, true, true)
- var counter = 0
- val ebo = new DoublingBackOff() {
- def call(): Boolean = {
- counter += 1
- toReturn(counter - 1)
- }
- }
-
- ebo.maybeCall() // will get back true
- assertEquals(1, counter)
- ebo.maybeCall() // will get back false
- assertEquals(2, counter)
- ebo.maybeCall() // last false means we won't actually call, will hold off for one iteration
- assertEquals(2, counter)
- ebo.maybeCall() // and call on this one, which gives back true
- assertEquals(3, counter)
- ebo.maybeCall() // so we immediately call again and increment
- assertEquals(4, counter)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
index 23d122e..1661b43 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
@@ -74,7 +74,7 @@ class TestPerformanceTask extends StreamTask with InitableTask with Logging {
/**
* How many messages to process before shutting down.
*/
- var maxMessages = 100000
+ var maxMessages = 10000000
/**
* If defined, incoming messages will be forwarded to this SystemStream. If
@@ -84,7 +84,7 @@ class TestPerformanceTask extends StreamTask with InitableTask with Logging {
def init(config: Config, context: TaskContext) {
logInterval = config.getInt("task.log.interval", 10000)
- maxMessages = config.getInt("task.max.messages", 100000)
+ maxMessages = config.getInt("task.max.messages", 10000000)
outputSystemStream = Option(config.get("task.outputs", null)).map(Util.getSystemStreamFromNames(_))
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7cecf0ae/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala b/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
index 4016768..1f4c247 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
@@ -75,7 +75,7 @@ class TestSamzaContainerPerformance extends Logging{
val partitionsPerStreamCount = System.getProperty("samza.mock.partitions.per.stream", "4").toInt
val brokerSleepMs = System.getProperty("samza.mock.broker.sleep.ms", "1").toInt
var logInterval = System.getProperty("samza.task.log.interval", "10000").toInt
- var maxMessages = System.getProperty("samza.task.max.messages", "100000").toInt
+ var maxMessages = System.getProperty("samza.task.max.messages", "10000000").toInt
val jobConfig = Map(
"job.factory.class" -> "org.apache.samza.job.local.LocalJobFactory",