You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/08/20 03:29:09 UTC

git commit: rename IncomingMessageEnvelopePicker to MessageChooser

Updated Branches:
  refs/heads/master e6e973897 -> 9e952ac3d


rename IncomingMessageEnvelopePicker to MessageChooser


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/9e952ac3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/9e952ac3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/9e952ac3

Branch: refs/heads/master
Commit: 9e952ac3d8deb49de9f0c6958e4673617298899b
Parents: e6e9738
Author: Chris Riccomini <cr...@gmail.com>
Authored: Mon Aug 19 18:28:42 2013 -0700
Committer: Chris Riccomini <cr...@gmail.com>
Committed: Mon Aug 19 18:28:42 2013 -0700

----------------------------------------------------------------------
 .../system/IncomingMessageEnvelopePicker.java   | 26 --------------
 .../org/apache/samza/system/MessageChooser.java | 26 ++++++++++++++
 .../apache/samza/container/SamzaContainer.scala | 12 +++----
 .../apache/samza/system/DefaultChooser.scala    | 28 +++++++++++++++
 .../org/apache/samza/system/DefaultPicker.scala | 28 ---------------
 .../apache/samza/system/SystemConsumers.scala   | 36 ++++++++++----------
 6 files changed, 78 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9e952ac3/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelopePicker.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelopePicker.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelopePicker.java
deleted file mode 100644
index c6fd1d8..0000000
--- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelopePicker.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system;
-
-public interface IncomingMessageEnvelopePicker {
-  void update(IncomingMessageEnvelope envelopes);
-
-  IncomingMessageEnvelope pick();
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9e952ac3/samza-api/src/main/java/org/apache/samza/system/MessageChooser.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/MessageChooser.java b/samza-api/src/main/java/org/apache/samza/system/MessageChooser.java
new file mode 100644
index 0000000..306b290
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/MessageChooser.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+public interface MessageChooser {
+  void update(IncomingMessageEnvelope envelopes);
+
+  IncomingMessageEnvelope choose();
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9e952ac3/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 2243b5c..2d2efdd 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -53,7 +53,7 @@ import org.apache.samza.util.Util
 import org.apache.samza.task.ReadableCoordinator
 import org.apache.samza.system.SystemProducers
 import org.apache.samza.task.ReadableCollector
-import org.apache.samza.system.DefaultPicker
+import org.apache.samza.system.DefaultChooser
 import org.apache.samza.system.SystemConsumers
 
 object SamzaContainer extends Logging {
@@ -227,9 +227,9 @@ object SamzaContainer extends Logging {
 
     val jvm = new JvmMetrics(samzaContainerMetrics.registry)
 
-    info("Setting up incoming message envelope picker.")
+    info("Setting up message chooser.")
 
-    val picker = new DefaultPicker
+    val chooser = new DefaultChooser
 
     info("Setting up metrics reporters.")
 
@@ -259,7 +259,7 @@ object SamzaContainer extends Logging {
 
     val consumerMultiplexer = new SystemConsumers(
       // TODO add config values for no new message timeout and max msgs per stream partition
-      picker = picker,
+      chooser = chooser,
       consumers = consumers,
       serdeManager = serdeManager)
 
@@ -528,9 +528,9 @@ class SamzaContainer(
   }
 
   def process(coordinator: ReadableCoordinator) {
-    trace("Attempting to pick a message to process.")
+    trace("Attempting to choose a message to process.")
 
-    val envelope = consumerMultiplexer.pick
+    val envelope = consumerMultiplexer.choose
 
     if (envelope != null) {
       val partition = envelope.getSystemStreamPartition.getPartition

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9e952ac3/samza-core/src/main/scala/org/apache/samza/system/DefaultChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/DefaultChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/DefaultChooser.scala
new file mode 100644
index 0000000..5a72e7a
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/system/DefaultChooser.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system
+
+import java.util.ArrayDeque
+
+class DefaultChooser extends MessageChooser {
+  var q = new ArrayDeque[IncomingMessageEnvelope]()
+  def update(envelope: IncomingMessageEnvelope) = q.add(envelope)
+  def choose = q.poll
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9e952ac3/samza-core/src/main/scala/org/apache/samza/system/DefaultPicker.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/DefaultPicker.scala b/samza-core/src/main/scala/org/apache/samza/system/DefaultPicker.scala
deleted file mode 100644
index 8bad75c..0000000
--- a/samza-core/src/main/scala/org/apache/samza/system/DefaultPicker.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system
-
-import java.util.ArrayDeque
-
-class DefaultPicker extends IncomingMessageEnvelopePicker {
-  var q = new ArrayDeque[IncomingMessageEnvelope]()
-  def update(envelope: IncomingMessageEnvelope) = q.add(envelope)
-  def pick = q.poll
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9e952ac3/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 2e6f3b8..b18f0cc 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -25,7 +25,7 @@ import grizzled.slf4j.Logging
 import org.apache.samza.serializers.SerdeManager
 
 class SystemConsumers(
-  picker: IncomingMessageEnvelopePicker,
+  chooser: MessageChooser,
   consumers: Map[String, SystemConsumer],
   serdeManager: SerdeManager,
   maxMsgsPerStreamPartition: Int = 1000,
@@ -34,7 +34,7 @@ class SystemConsumers(
   // TODO add metrics
 
   var unprocessedMessages = Map[SystemStreamPartition, Queue[IncomingMessageEnvelope]]()
-  var neededByPicker = Set[SystemStreamPartition]()
+  var neededByChooser = Set[SystemStreamPartition]()
   var fetchMap = Map[SystemStreamPartition, java.lang.Integer]()
   var timeout = noNewMessagesTimeout
 
@@ -57,47 +57,47 @@ class SystemConsumers(
   def register(systemStreamPartition: SystemStreamPartition, lastReadOffset: String) {
     debug("Registering stream: %s, %s" format (systemStreamPartition, lastReadOffset))
 
-    neededByPicker += systemStreamPartition
+    neededByChooser += systemStreamPartition
     fetchMap += systemStreamPartition -> maxMsgsPerStreamPartition
     unprocessedMessages += systemStreamPartition -> Queue[IncomingMessageEnvelope]()
     consumers(systemStreamPartition.getSystem).register(systemStreamPartition, lastReadOffset)
   }
 
-  def pick = {
-    val picked = picker.pick
+  def choose = {
+    val envelopeFromChooser = chooser.choose
 
-    if (picked == null) {
-      debug("Picker returned null.")
+    if (envelopeFromChooser == null) {
+      debug("Chooser returned null.")
 
-      // Allow blocking if the picker didn't pick a message.
+      // Allow blocking if the chooser didn't choose a message.
       timeout = noNewMessagesTimeout
     } else {
-      debug("Picker returned an incoming message envelope: %s" format picked)
+      debug("Chooser returned an incoming message envelope: %s" format envelopeFromChooser)
 
       // Don't block if we have a message to process.
       timeout = 0
 
-      // Ok to give the picker a new message from this stream.
-      neededByPicker += picked.getSystemStreamPartition
+      // Ok to give the chooser a new message from this stream.
+      neededByChooser += envelopeFromChooser.getSystemStreamPartition
     }
 
     refresh
-    picked
+    envelopeFromChooser
   }
 
   private def refresh {
-    debug("Refreshing picker with new messages.")
+    debug("Refreshing chooser with new messages.")
 
     // Poll every system for new messages.
     consumers.keys.foreach(poll(_))
 
-    // Update the picker.
-    neededByPicker.foreach(systemStreamPartition =>
-      // If we have messages for a stream that the picker needs, then update.
+    // Update the chooser.
+    neededByChooser.foreach(systemStreamPartition =>
+      // If we have messages for a stream that the chooser needs, then update.
       if (fetchMap(systemStreamPartition).intValue < maxMsgsPerStreamPartition) {
-        picker.update(unprocessedMessages(systemStreamPartition).dequeue)
+        chooser.update(unprocessedMessages(systemStreamPartition).dequeue)
         fetchMap += systemStreamPartition -> (fetchMap(systemStreamPartition).intValue + 1)
-        neededByPicker -= systemStreamPartition
+        neededByChooser -= systemStreamPartition
       })
   }