You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/08/20 03:29:09 UTC
git commit: rename IncomingMessageEnvelopePicker to MessageChooser
Updated Branches:
refs/heads/master e6e973897 -> 9e952ac3d
rename IncomingMessageEnvelopePicker to MessageChooser
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/9e952ac3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/9e952ac3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/9e952ac3
Branch: refs/heads/master
Commit: 9e952ac3d8deb49de9f0c6958e4673617298899b
Parents: e6e9738
Author: Chris Riccomini <cr...@gmail.com>
Authored: Mon Aug 19 18:28:42 2013 -0700
Committer: Chris Riccomini <cr...@gmail.com>
Committed: Mon Aug 19 18:28:42 2013 -0700
----------------------------------------------------------------------
.../system/IncomingMessageEnvelopePicker.java | 26 --------------
.../org/apache/samza/system/MessageChooser.java | 26 ++++++++++++++
.../apache/samza/container/SamzaContainer.scala | 12 +++----
.../apache/samza/system/DefaultChooser.scala | 28 +++++++++++++++
.../org/apache/samza/system/DefaultPicker.scala | 28 ---------------
.../apache/samza/system/SystemConsumers.scala | 36 ++++++++++----------
6 files changed, 78 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9e952ac3/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelopePicker.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelopePicker.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelopePicker.java
deleted file mode 100644
index c6fd1d8..0000000
--- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelopePicker.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system;
-
-public interface IncomingMessageEnvelopePicker {
- void update(IncomingMessageEnvelope envelopes);
-
- IncomingMessageEnvelope pick();
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9e952ac3/samza-api/src/main/java/org/apache/samza/system/MessageChooser.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/MessageChooser.java b/samza-api/src/main/java/org/apache/samza/system/MessageChooser.java
new file mode 100644
index 0000000..306b290
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/MessageChooser.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+public interface MessageChooser {
+ void update(IncomingMessageEnvelope envelopes);
+
+ IncomingMessageEnvelope choose();
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9e952ac3/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 2243b5c..2d2efdd 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -53,7 +53,7 @@ import org.apache.samza.util.Util
import org.apache.samza.task.ReadableCoordinator
import org.apache.samza.system.SystemProducers
import org.apache.samza.task.ReadableCollector
-import org.apache.samza.system.DefaultPicker
+import org.apache.samza.system.DefaultChooser
import org.apache.samza.system.SystemConsumers
object SamzaContainer extends Logging {
@@ -227,9 +227,9 @@ object SamzaContainer extends Logging {
val jvm = new JvmMetrics(samzaContainerMetrics.registry)
- info("Setting up incoming message envelope picker.")
+ info("Setting up message chooser.")
- val picker = new DefaultPicker
+ val chooser = new DefaultChooser
info("Setting up metrics reporters.")
@@ -259,7 +259,7 @@ object SamzaContainer extends Logging {
val consumerMultiplexer = new SystemConsumers(
// TODO add config values for no new message timeout and max msgs per stream partition
- picker = picker,
+ chooser = chooser,
consumers = consumers,
serdeManager = serdeManager)
@@ -528,9 +528,9 @@ class SamzaContainer(
}
def process(coordinator: ReadableCoordinator) {
- trace("Attempting to pick a message to process.")
+ trace("Attempting to choose a message to process.")
- val envelope = consumerMultiplexer.pick
+ val envelope = consumerMultiplexer.choose
if (envelope != null) {
val partition = envelope.getSystemStreamPartition.getPartition
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9e952ac3/samza-core/src/main/scala/org/apache/samza/system/DefaultChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/DefaultChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/DefaultChooser.scala
new file mode 100644
index 0000000..5a72e7a
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/system/DefaultChooser.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system
+
+import java.util.ArrayDeque
+
+class DefaultChooser extends MessageChooser {
+ var q = new ArrayDeque[IncomingMessageEnvelope]()
+ def update(envelope: IncomingMessageEnvelope) = q.add(envelope)
+ def choose = q.poll
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9e952ac3/samza-core/src/main/scala/org/apache/samza/system/DefaultPicker.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/DefaultPicker.scala b/samza-core/src/main/scala/org/apache/samza/system/DefaultPicker.scala
deleted file mode 100644
index 8bad75c..0000000
--- a/samza-core/src/main/scala/org/apache/samza/system/DefaultPicker.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system
-
-import java.util.ArrayDeque
-
-class DefaultPicker extends IncomingMessageEnvelopePicker {
- var q = new ArrayDeque[IncomingMessageEnvelope]()
- def update(envelope: IncomingMessageEnvelope) = q.add(envelope)
- def pick = q.poll
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9e952ac3/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 2e6f3b8..b18f0cc 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -25,7 +25,7 @@ import grizzled.slf4j.Logging
import org.apache.samza.serializers.SerdeManager
class SystemConsumers(
- picker: IncomingMessageEnvelopePicker,
+ chooser: MessageChooser,
consumers: Map[String, SystemConsumer],
serdeManager: SerdeManager,
maxMsgsPerStreamPartition: Int = 1000,
@@ -34,7 +34,7 @@ class SystemConsumers(
// TODO add metrics
var unprocessedMessages = Map[SystemStreamPartition, Queue[IncomingMessageEnvelope]]()
- var neededByPicker = Set[SystemStreamPartition]()
+ var neededByChooser = Set[SystemStreamPartition]()
var fetchMap = Map[SystemStreamPartition, java.lang.Integer]()
var timeout = noNewMessagesTimeout
@@ -57,47 +57,47 @@ class SystemConsumers(
def register(systemStreamPartition: SystemStreamPartition, lastReadOffset: String) {
debug("Registering stream: %s, %s" format (systemStreamPartition, lastReadOffset))
- neededByPicker += systemStreamPartition
+ neededByChooser += systemStreamPartition
fetchMap += systemStreamPartition -> maxMsgsPerStreamPartition
unprocessedMessages += systemStreamPartition -> Queue[IncomingMessageEnvelope]()
consumers(systemStreamPartition.getSystem).register(systemStreamPartition, lastReadOffset)
}
- def pick = {
- val picked = picker.pick
+ def choose = {
+ val envelopeFromChooser = chooser.choose
- if (picked == null) {
- debug("Picker returned null.")
+ if (envelopeFromChooser == null) {
+ debug("Chooser returned null.")
- // Allow blocking if the picker didn't pick a message.
+ // Allow blocking if the chooser didn't choose a message.
timeout = noNewMessagesTimeout
} else {
- debug("Picker returned an incoming message envelope: %s" format picked)
+ debug("Chooser returned an incoming message envelope: %s" format envelopeFromChooser)
// Don't block if we have a message to process.
timeout = 0
- // Ok to give the picker a new message from this stream.
- neededByPicker += picked.getSystemStreamPartition
+ // Ok to give the chooser a new message from this stream.
+ neededByChooser += envelopeFromChooser.getSystemStreamPartition
}
refresh
- picked
+ envelopeFromChooser
}
private def refresh {
- debug("Refreshing picker with new messages.")
+ debug("Refreshing chooser with new messages.")
// Poll every system for new messages.
consumers.keys.foreach(poll(_))
- // Update the picker.
- neededByPicker.foreach(systemStreamPartition =>
- // If we have messages for a stream that the picker needs, then update.
+ // Update the chooser.
+ neededByChooser.foreach(systemStreamPartition =>
+ // If we have messages for a stream that the chooser needs, then update.
if (fetchMap(systemStreamPartition).intValue < maxMsgsPerStreamPartition) {
- picker.update(unprocessedMessages(systemStreamPartition).dequeue)
+ chooser.update(unprocessedMessages(systemStreamPartition).dequeue)
fetchMap += systemStreamPartition -> (fetchMap(systemStreamPartition).intValue + 1)
- neededByPicker -= systemStreamPartition
+ neededByChooser -= systemStreamPartition
})
}