You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jg...@apache.org on 2014/12/15 19:35:14 UTC

incubator-samza git commit: SAMZA-163: Remove mutable.ConcurrentMap when we stop supporting 2.9

Repository: incubator-samza
Updated Branches:
  refs/heads/master d497a4079 -> 3161c640a


SAMZA-163: Remove mutable.ConcurrentMap when we stop supporting 2.9


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/3161c640
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/3161c640
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/3161c640

Branch: refs/heads/master
Commit: 3161c640a4cfada839668054d9e8fd8b9b6cd046
Parents: d497a40
Author: Jakob Homan <jg...@gmail.com>
Authored: Mon Dec 15 10:35:15 2014 -0800
Committer: Jakob Homan <jg...@gmail.com>
Committed: Mon Dec 15 10:35:15 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/samza/metrics/Snapshot.java    |  1 +
 .../apache/samza/system/kafka/BrokerProxy.scala    | 17 +++++++++--------
 .../samza/util/ClientUtilTopicMetadataStore.scala  |  3 ---
 .../samza/test/integration/SimpleStatefulTask.java |  3 ++-
 .../samza/test/integration/StatePerfTestTask.java  |  3 ++-
 .../samza/test/integration/join/Checker.java       |  1 +
 6 files changed, 15 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3161c640/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java b/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java
index 7666909..4c7b525 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java
@@ -90,6 +90,7 @@ public class Snapshot {
    *
    * @return the list of values
    */
+  @SuppressWarnings("unchecked")
   public ArrayList<Long> getValues() {
     return (ArrayList<Long>) values.clone();
   }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3161c640/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index ba1aa9b..9daf824 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -21,18 +21,19 @@
 
 package org.apache.samza.system.kafka
 
+import java.nio.channels.ClosedByInterruptException
+import java.util.Map.Entry
+import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
 import kafka.api._
 import kafka.common.{NotLeaderForPartitionException, UnknownTopicOrPartitionException, ErrorMapping, TopicAndPartition}
-import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
-import scala.collection.JavaConversions._
+import kafka.consumer.ConsumerConfig
 import kafka.message.MessageSet
+import org.apache.samza.util.ExponentialSleepStrategy
 import org.apache.samza.util.Logging
-import java.nio.channels.ClosedByInterruptException
-import java.util.Map.Entry
-import scala.collection.mutable
-import kafka.consumer.ConsumerConfig
 import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX
-import org.apache.samza.util.ExponentialSleepStrategy
+import scala.collection.JavaConversions._
+import scala.collection.concurrent
+import scala.collection.mutable
 
 /**
  *  Companion object for class JvmMetrics encapsulating various constants
@@ -65,7 +66,7 @@ class BrokerProxy(
   val sleepMSWhileNoTopicPartitions = 100
 
   /** What's the next offset for a particular partition? **/
-  val nextOffsets:mutable.ConcurrentMap[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]()
+  val nextOffsets:concurrent.Map[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]()
 
   /** Block on the first call to get message if the fetcher has not yet returned its initial results **/
   // TODO: It should be sufficient to just use the count down latch and await on it for each of the calls, but

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3161c640/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala b/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala
index 1f415d2..0f91622 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala
@@ -22,9 +22,6 @@ package org.apache.samza.util
 import kafka.api.{ TopicMetadataResponse, TopicMetadata }
 import org.apache.samza.SamzaException
 import kafka.client.ClientUtils
-import org.apache.samza.util.Logging
-import kafka.common.ErrorMapping
-import kafka.cluster.Broker
 import java.util.concurrent.atomic.AtomicInteger
 
 trait TopicMetadataStore extends Logging {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3161c640/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
index 52a8059..4dbcb75 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
@@ -34,7 +34,8 @@ import org.apache.samza.task.TaskCoordinator.RequestScope;
 public class SimpleStatefulTask implements StreamTask, InitableTask {
   
   private KeyValueStore<String, String> store;
-  
+
+  @SuppressWarnings("unchecked")
   public void init(Config config, TaskContext context) {
     this.store = (KeyValueStore<String, String>) context.getStore("mystore");
     System.out.println("Contents of store: ");

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3161c640/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
index 25417a6..77f770e 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
@@ -35,7 +35,8 @@ public class StatePerfTestTask implements StreamTask, InitableTask {
   private int count = 0;
   private int LOG_INTERVAL = 100000;
   private long start = System.currentTimeMillis();
-  
+
+  @SuppressWarnings("unchecked")
   public void init(Config config, TaskContext context) {
     this.store = (KeyValueStore<String, String>) context.getStore("mystore");
   }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3161c640/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
index 3012b19..2a2177a 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
@@ -41,6 +41,7 @@ public class Checker implements StreamTask, WindowableTask, InitableTask {
   private int numPartitions;
   
   @Override
+  @SuppressWarnings("unchecked")
   public void init(Config config, TaskContext context) {
     this.store = (KeyValueStore<String, String>) context.getStore("checker-state");
     this.expectedKeys = config.getInt("expected.keys");