You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jg...@apache.org on 2014/09/15 21:47:30 UTC

git commit: SAMZA-413: Correct and expand BlockingEnvelopeMap javadoc

Repository: incubator-samza
Updated Branches:
  refs/heads/master 87ce08e02 -> 9b7e451c8


SAMZA-413: Correct and expand BlockingEnvelopeMap javadoc


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/9b7e451c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/9b7e451c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/9b7e451c

Branch: refs/heads/master
Commit: 9b7e451c82e9f6721c671f17a06250d0138194a1
Parents: 87ce08e
Author: Jakob Homan <jg...@gmail.com>
Authored: Mon Sep 15 11:54:31 2014 -0700
Committer: Jakob Homan <jg...@gmail.com>
Committed: Mon Sep 15 11:54:31 2014 -0700

----------------------------------------------------------------------
 .../apache/samza/util/BlockingEnvelopeMap.java  | 41 +++++++++++++++++---
 1 file changed, 35 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9b7e451c/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index 317e073..e30321d 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -49,13 +49,16 @@ import org.apache.samza.system.SystemStreamPartition;
  * 
  * <p>
  * SystemConsumers that implement BlockingEnvelopeMap need to add messages using
- * add (or addAll), and update noMoreMessage using setIsAtHead. The
- * noMoreMessage variable is used to determine whether a SystemStreamPartition
- * is "caught up" (has read all possible messages from the underlying system).
- * For example, with a Kafka system, noMoreMessages would be set to true when
- * the last message offset returned is equal to the offset high watermark for a
- * given topic/partition.
+ * {@link #put(org.apache.samza.system.SystemStreamPartition, org.apache.samza.system.IncomingMessageEnvelope) put}
+ * (or {@link #putAll(org.apache.samza.system.SystemStreamPartition, java.util.List) putAll}),
+ * and update noMoreMessage using setIsAtHead. The noMoreMessage variable is used
+ * to determine whether a SystemStreamPartition is "caught up" (has read all
+ * possible messages from the underlying system). For example, with a Kafka
+ * system, noMoreMessages would be set to true when the last message offset
+ * returned is equal to the offset high watermark for a given topic/partition.
  * </p>
+ * The BlockingEnvelopeMap is backed by a concurrent map, which allows concurrent
+ * put or putAll calls to be thread safe without external synchronization.
  */
 public abstract class BlockingEnvelopeMap implements SystemConsumer {
   private final BlockingEnvelopeMapMetrics metrics;
@@ -91,6 +94,9 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
     this.clock = clock;
   }
 
+  /**
+   * {@inheritDoc}
+   */
   public void register(SystemStreamPartition systemStreamPartition, String offset) {
     metrics.initMetrics(systemStreamPartition);
     bufferedMessages.putIfAbsent(systemStreamPartition, newBlockingQueue());
@@ -100,6 +106,9 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
     return new LinkedBlockingQueue<IncomingMessageEnvelope>();
   }
 
+  /**
+   * {@inheritDoc}
+   */
   public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
     long stopTime = clock.currentTimeMillis() + timeout;
     Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messagesToReturn = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
@@ -147,10 +156,30 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
     return messagesToReturn;
   }
 
+  /**
+   * Place a new {@link org.apache.samza.system.IncomingMessageEnvelope} on the
+   * queue for the specified {@link org.apache.samza.system.SystemStreamPartition}.
+   *
+   * @param systemStreamPartition SystemStreamPartition that owns the envelope
+   * @param envelope Message for specified SystemStreamPartition
+   * @throws InterruptedException from underlying concurrent collection
+   */
   protected void put(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) throws InterruptedException {
     bufferedMessages.get(systemStreamPartition).put(envelope);
   }
 
+  /**
+   * Place a collection of {@link org.apache.samza.system.IncomingMessageEnvelope}
+   * on the queue for the specified {@link org.apache.samza.system.SystemStreamPartition}.
+   * <p>
+   * Insertion of all the messages into the queue is not guaranteed to be done
+   * atomically.
+   * </p>
+   *
+   * @param systemStreamPartition SystemStreamPartition that owns the envelope
+   * @param envelopes Messages for specified SystemStreamPartition
+   * @throws InterruptedException from underlying concurrent collection
+   */
   protected void putAll(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> envelopes) throws InterruptedException {
     BlockingQueue<IncomingMessageEnvelope> queue = bufferedMessages.get(systemStreamPartition);