You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jg...@apache.org on 2014/12/15 19:35:14 UTC
incubator-samza git commit: SAMZA-163: Remove mutable.ConcurrentMap
when we stop supporting 2.9
Repository: incubator-samza
Updated Branches:
refs/heads/master d497a4079 -> 3161c640a
SAMZA-163: Remove mutable.ConcurrentMap when we stop supporting 2.9
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/3161c640
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/3161c640
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/3161c640
Branch: refs/heads/master
Commit: 3161c640a4cfada839668054d9e8fd8b9b6cd046
Parents: d497a40
Author: Jakob Homan <jg...@gmail.com>
Authored: Mon Dec 15 10:35:15 2014 -0800
Committer: Jakob Homan <jg...@gmail.com>
Committed: Mon Dec 15 10:35:15 2014 -0800
----------------------------------------------------------------------
.../java/org/apache/samza/metrics/Snapshot.java | 1 +
.../apache/samza/system/kafka/BrokerProxy.scala | 17 +++++++++--------
.../samza/util/ClientUtilTopicMetadataStore.scala | 3 ---
.../samza/test/integration/SimpleStatefulTask.java | 3 ++-
.../samza/test/integration/StatePerfTestTask.java | 3 ++-
.../samza/test/integration/join/Checker.java | 1 +
6 files changed, 15 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3161c640/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java b/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java
index 7666909..4c7b525 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java
@@ -90,6 +90,7 @@ public class Snapshot {
*
* @return the list of values
*/
+ @SuppressWarnings("unchecked")
public ArrayList<Long> getValues() {
return (ArrayList<Long>) values.clone();
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3161c640/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index ba1aa9b..9daf824 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -21,18 +21,19 @@
package org.apache.samza.system.kafka
+import java.nio.channels.ClosedByInterruptException
+import java.util.Map.Entry
+import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
import kafka.api._
import kafka.common.{NotLeaderForPartitionException, UnknownTopicOrPartitionException, ErrorMapping, TopicAndPartition}
-import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
-import scala.collection.JavaConversions._
+import kafka.consumer.ConsumerConfig
import kafka.message.MessageSet
+import org.apache.samza.util.ExponentialSleepStrategy
import org.apache.samza.util.Logging
-import java.nio.channels.ClosedByInterruptException
-import java.util.Map.Entry
-import scala.collection.mutable
-import kafka.consumer.ConsumerConfig
import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX
-import org.apache.samza.util.ExponentialSleepStrategy
+import scala.collection.JavaConversions._
+import scala.collection.concurrent
+import scala.collection.mutable
/**
* Companion object for class JvmMetrics encapsulating various constants
@@ -65,7 +66,7 @@ class BrokerProxy(
val sleepMSWhileNoTopicPartitions = 100
/** What's the next offset for a particular partition? **/
- val nextOffsets:mutable.ConcurrentMap[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]()
+ val nextOffsets:concurrent.Map[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]()
/** Block on the first call to get message if the fetcher has not yet returned its initial results **/
// TODO: It should be sufficient to just use the count down latch and await on it for each of the calls, but
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3161c640/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala b/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala
index 1f415d2..0f91622 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala
@@ -22,9 +22,6 @@ package org.apache.samza.util
import kafka.api.{ TopicMetadataResponse, TopicMetadata }
import org.apache.samza.SamzaException
import kafka.client.ClientUtils
-import org.apache.samza.util.Logging
-import kafka.common.ErrorMapping
-import kafka.cluster.Broker
import java.util.concurrent.atomic.AtomicInteger
trait TopicMetadataStore extends Logging {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3161c640/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
index 52a8059..4dbcb75 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
@@ -34,7 +34,8 @@ import org.apache.samza.task.TaskCoordinator.RequestScope;
public class SimpleStatefulTask implements StreamTask, InitableTask {
private KeyValueStore<String, String> store;
-
+
+ @SuppressWarnings("unchecked")
public void init(Config config, TaskContext context) {
this.store = (KeyValueStore<String, String>) context.getStore("mystore");
System.out.println("Contents of store: ");
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3161c640/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
index 25417a6..77f770e 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
@@ -35,7 +35,8 @@ public class StatePerfTestTask implements StreamTask, InitableTask {
private int count = 0;
private int LOG_INTERVAL = 100000;
private long start = System.currentTimeMillis();
-
+
+ @SuppressWarnings("unchecked")
public void init(Config config, TaskContext context) {
this.store = (KeyValueStore<String, String>) context.getStore("mystore");
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3161c640/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
index 3012b19..2a2177a 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
@@ -41,6 +41,7 @@ public class Checker implements StreamTask, WindowableTask, InitableTask {
private int numPartitions;
@Override
+ @SuppressWarnings("unchecked")
public void init(Config config, TaskContext context) {
this.store = (KeyValueStore<String, String>) context.getStore("checker-state");
this.expectedKeys = config.getInt("expected.keys");