You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/04/29 01:03:50 UTC

samza git commit: SAMZA-938: reverting the APIs added in SAMZA-813 to keep backward compatibility in 0.10.1

Repository: samza
Updated Branches:
  refs/heads/master efa50a790 -> 373181c57


SAMZA-938: reverting the APIs added in SAMZA-813 to keep backward compatibility in 0.10.1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/373181c5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/373181c5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/373181c5

Branch: refs/heads/master
Commit: 373181c5792b34d2c9c86ad1859cb01722584992
Parents: efa50a7
Author: Jacob Maes <ja...@gmail.com>
Authored: Thu Apr 28 16:02:47 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Thu Apr 28 16:02:47 2016 -0700

----------------------------------------------------------------------
 .../samza/storage/kv/KeyValueIterator.java       |  2 --
 .../apache/samza/storage/kv/KeyValueStore.java   |  7 -------
 .../kv/inmemory/InMemoryKeyValueStore.scala      | 16 ----------------
 .../samza/storage/kv/RocksDbKeyValueStore.scala  | 19 +------------------
 .../apache/samza/storage/kv/CachedStore.scala    | 10 ----------
 .../samza/storage/kv/CachedStoreMetrics.scala    |  1 -
 .../samza/storage/kv/KeyValueStorageEngine.scala |  5 -----
 .../kv/KeyValueStorageEngineMetrics.scala        |  1 -
 .../samza/storage/kv/KeyValueStoreMetrics.scala  |  1 -
 .../apache/samza/storage/kv/LoggedStore.scala    |  5 -----
 .../samza/storage/kv/LoggedStoreMetrics.scala    |  1 -
 .../samza/storage/kv/NullSafeKeyValueStore.scala |  4 ----
 .../storage/kv/SerializedKeyValueStore.scala     |  9 ---------
 .../kv/SerializedKeyValueStoreMetrics.scala      |  1 -
 .../samza/storage/kv/MockKeyValueStore.scala     | 11 -----------
 15 files changed, 1 insertion(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
index aa1f88c..854ebbf 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
@@ -23,6 +23,4 @@ import java.util.Iterator;
 
 public interface KeyValueIterator<K, V> extends Iterator<Entry<K, V>> {
   public void close();
-  public void seekToFirst();
-  public void seek(K key);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
index 7d4d353..b1fea7b 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
@@ -103,13 +103,6 @@ public interface KeyValueStore<K, V> {
   KeyValueIterator<K, V> all();
 
   /**
-   * Return an iterator which is yet to be positions. This iterator must be positioned
-   * first before a call to next() is made. This iterator MUST be closed after use.
-   * @return An iterator
-   */
-  KeyValueIterator<K, V> newIterator();
-
-  /**
    * Closes this key-value store, if applicable, relinquishing any underlying resources.
    */
   void close();

http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
index 661a835..72f25a3 100644
--- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
+++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
@@ -61,18 +61,6 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor
     }
 
     override def hasNext: Boolean = iter.hasNext
-
-    /*
-    * This method is supposed to be called only after an iterator is created first
-    * using the store's newIterator position. For some stores, the creation of the
-    * */
-    override def seekToFirst(): Unit = {
-        throw new UnsupportedOperationException
-    }
-
-    override def seek(key: Array[Byte]): Unit = {
-        throw new UnsupportedOperationException
-    }
   }
 
   override def all(): KeyValueIterator[Array[Byte], Array[Byte]] = {
@@ -132,8 +120,4 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor
   override def getAll(keys: java.util.List[Array[Byte]]): java.util.Map[Array[Byte], Array[Byte]] = {
     KeyValueStore.Extension.getAll(this, keys);
   }
-
-  override def newIterator(): KeyValueIterator[Array[Byte], Array[Byte]] = {
-    throw new UnsupportedOperationException
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index b896810..f0965ae 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -194,11 +194,6 @@ class RocksDbKeyValueStore(
     new RocksDbIterator(iter)
   }
 
-  def newIterator(): KeyValueIterator[Array[Byte], Array[Byte]] = {
-      metrics.newIterator.inc
-      new RocksDbIterator(db.newIterator);
-  }
-
   def flush {
     metrics.flushes.inc
     trace("Flushing.")
@@ -211,7 +206,7 @@ class RocksDbKeyValueStore(
   }
 
   class RocksDbIterator(iter: RocksIterator) extends KeyValueIterator[Array[Byte], Array[Byte]] {
-    private var open = iter.isValid
+    private var open = true
     private var firstValueAccessed = false;
     def close() = {
       open = false
@@ -253,18 +248,6 @@ class RocksDbKeyValueStore(
       entry
     }
 
-    def seekToFirst: Unit = {
-        metrics.alls.inc()
-        iter.seekToFirst()
-        open = true
-    }
-
-    def seek(target: Array[Byte]): Unit = {
-        metrics.alls.inc()
-        iter.seek(target)
-        open = true
-    }
-
     override def finalize() {
       if (open) {
         trace("Leaked reference to RocksDB iterator, forcing close.")

http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 4c4e82e..c28f8db 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -124,10 +124,6 @@ class CachedStore[K, V](
     }
 
     override def hasNext: Boolean = iter.hasNext
-
-    override def seekToFirst(): Unit = iter.seekToFirst()
-
-    override def seek(key: K): Unit = iter.seek(key)
   }
 
   override def range(from: K, to: K): KeyValueIterator[K, V] = {
@@ -144,12 +140,6 @@ class CachedStore[K, V](
     new CachedStoreIterator(store.all())
   }
 
-  override def newIterator(): KeyValueIterator[K, V] = {
-    metrics.newIterator.inc
-    flush()
-    new CachedStoreIterator(store.newIterator())
-  }
-
   override def put(key: K, value: V) {
     metrics.puts.inc
 

http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
index 04109c9..a11b5fe 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
@@ -35,7 +35,6 @@ class CachedStoreMetrics(
   val deletes = newCounter("deletes")
   val flushes = newCounter("flushes")
   val putAllDirtyEntriesBatchSize = newCounter("put-all-dirty-entries-batch-size")
-  val newIterator = newCounter("newitarator")
 
   def setDirtyCount(getValue: () => Int) {
     newGauge("dirty-count", getValue)

http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index defc91e..e5a66a4 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -79,11 +79,6 @@ class KeyValueStorageEngine[K, V](
     wrapperStore.all()
   }
 
-  def newIterator() = {
-    metrics.newIterator.inc
-    wrapperStore.newIterator()
-  }
-
   /**
    * Restore the contents of this key/value store from the change log,
    * batching updates to underlying raw store to skip wrapping functions for efficiency.

http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
index f842b6f..233fba9 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
@@ -33,7 +33,6 @@ class KeyValueStorageEngineMetrics(
   val puts = newCounter("puts")
   val deletes = newCounter("deletes")
   val flushes = newCounter("flushes")
-  val newIterator = newCounter("newiterator")
 
   val restoredMessages = newCounter("messages-restored")
   val restoredBytes = newCounter("messages-bytes")

http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala
index c303261..967d509 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala
@@ -35,7 +35,6 @@ class KeyValueStoreMetrics(
   val flushes = newCounter("flushes")
   val bytesWritten = newCounter("bytes-written")
   val bytesRead = newCounter("bytes-read")
-  val newIterator = newCounter("newitertor")
 
   override def getPrefix = storeName + "-"
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
index e293bfc..7bba6ff 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
@@ -56,11 +56,6 @@ class LoggedStore[K, V](
     store.all()
   }
 
-  def newIterator() = {
-    metrics.newIterator.inc
-    store.newIterator()
-  }
-
   /**
    * Perform the local update and log it out to the changelog
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala
index f856432..743151a 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala
@@ -33,7 +33,6 @@ class LoggedStoreMetrics(
   val puts = newCounter("puts")
   val deletes = newCounter("deletes")
   val flushes = newCounter("flushes")
-  val newIterator = newCounter("newiterator")
 
   override def getPrefix = storeName + "-"
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
index 9d7baaa..3de257c 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
@@ -78,10 +78,6 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt
     store.all
   }
 
-  def newIterator(): KeyValueIterator[K, V] = {
-    store.newIterator()
-  }
-
   def flush {
     store.flush
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index cf1a2cc..8e183ef 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -97,11 +97,6 @@ class SerializedKeyValueStore[K, V](
     new DeserializingIterator(store.all)
   }
 
-  def newIterator(): KeyValueIterator[K, V] = {
-    metrics.newIterator.inc
-    new DeserializingIterator(store.newIterator())
-  }
-
   private class DeserializingIterator(iter: KeyValueIterator[Array[Byte], Array[Byte]]) extends KeyValueIterator[K, V] {
     def hasNext() = iter.hasNext()
     def remove() = iter.remove()
@@ -112,10 +107,6 @@ class SerializedKeyValueStore[K, V](
       val value = fromBytesOrNull(nxt.getValue, msgSerde)
       new Entry(key, value)
     }
-
-    override def seekToFirst(): Unit = iter.seekToFirst()
-
-    override def seek(key: K): Unit = iter.seek(toBytesOrNull(key, keySerde))
   }
 
   def flush {

http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
index bb745e0..841e4a2 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
@@ -35,7 +35,6 @@ class SerializedKeyValueStoreMetrics(
   val flushes = newCounter("flushes")
   val bytesSerialized = newCounter("bytes-serialized")
   val bytesDeserialized = newCounter("bytes-deserialized")
-  val newIterator = newCounter("newiterator")
 
   override def getPrefix = storeName + "-"
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
index e14a461..595dd0d 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
@@ -58,14 +58,6 @@ class MockKeyValueStore extends KeyValueStore[String, String] {
     override def remove(): Unit = iter.remove()
 
     override def close(): Unit = Unit
-
-    override def seekToFirst(): Unit = {
-      throw new UnsupportedOperationException
-    }
-
-    override def seek(key: String): Unit = {
-      throw new UnsupportedOperationException
-    }
   }
 
   override def range(from: String, to: String): KeyValueIterator[String, String] =
@@ -74,9 +66,6 @@ class MockKeyValueStore extends KeyValueStore[String, String] {
   override def all(): KeyValueIterator[String, String] =
     new MockIterator(kvMap.entrySet().iterator())
 
-  override def newIterator(): KeyValueIterator[String, String] =
-   throw new UnsupportedOperationException
-
   override def flush() {}  // no-op
 
   override def close() { kvMap.clear() }