You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/04/29 01:03:50 UTC
samza git commit: SAMZA-938: reverting the APIs added in SAMZA-813 to
keep backward compatibility in 0.10.1
Repository: samza
Updated Branches:
refs/heads/master efa50a790 -> 373181c57
SAMZA-938: reverting the APIs added in SAMZA-813 to keep backward compatibility in 0.10.1
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/373181c5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/373181c5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/373181c5
Branch: refs/heads/master
Commit: 373181c5792b34d2c9c86ad1859cb01722584992
Parents: efa50a7
Author: Jacob Maes <ja...@gmail.com>
Authored: Thu Apr 28 16:02:47 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Thu Apr 28 16:02:47 2016 -0700
----------------------------------------------------------------------
.../samza/storage/kv/KeyValueIterator.java | 2 --
.../apache/samza/storage/kv/KeyValueStore.java | 7 -------
.../kv/inmemory/InMemoryKeyValueStore.scala | 16 ----------------
.../samza/storage/kv/RocksDbKeyValueStore.scala | 19 +------------------
.../apache/samza/storage/kv/CachedStore.scala | 10 ----------
.../samza/storage/kv/CachedStoreMetrics.scala | 1 -
.../samza/storage/kv/KeyValueStorageEngine.scala | 5 -----
.../kv/KeyValueStorageEngineMetrics.scala | 1 -
.../samza/storage/kv/KeyValueStoreMetrics.scala | 1 -
.../apache/samza/storage/kv/LoggedStore.scala | 5 -----
.../samza/storage/kv/LoggedStoreMetrics.scala | 1 -
.../samza/storage/kv/NullSafeKeyValueStore.scala | 4 ----
.../storage/kv/SerializedKeyValueStore.scala | 9 ---------
.../kv/SerializedKeyValueStoreMetrics.scala | 1 -
.../samza/storage/kv/MockKeyValueStore.scala | 11 -----------
15 files changed, 1 insertion(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
index aa1f88c..854ebbf 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterator.java
@@ -23,6 +23,4 @@ import java.util.Iterator;
public interface KeyValueIterator<K, V> extends Iterator<Entry<K, V>> {
public void close();
- public void seekToFirst();
- public void seek(K key);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
index 7d4d353..b1fea7b 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
@@ -103,13 +103,6 @@ public interface KeyValueStore<K, V> {
KeyValueIterator<K, V> all();
/**
- * Return an iterator which is yet to be positions. This iterator must be positioned
- * first before a call to next() is made. This iterator MUST be closed after use.
- * @return An iterator
- */
- KeyValueIterator<K, V> newIterator();
-
- /**
* Closes this key-value store, if applicable, relinquishing any underlying resources.
*/
void close();
http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
index 661a835..72f25a3 100644
--- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
+++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
@@ -61,18 +61,6 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor
}
override def hasNext: Boolean = iter.hasNext
-
- /*
- * This method is supposed to be called only after an iterator is created first
- * using the store's newIterator position. For some stores, the creation of the
- * */
- override def seekToFirst(): Unit = {
- throw new UnsupportedOperationException
- }
-
- override def seek(key: Array[Byte]): Unit = {
- throw new UnsupportedOperationException
- }
}
override def all(): KeyValueIterator[Array[Byte], Array[Byte]] = {
@@ -132,8 +120,4 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor
override def getAll(keys: java.util.List[Array[Byte]]): java.util.Map[Array[Byte], Array[Byte]] = {
KeyValueStore.Extension.getAll(this, keys);
}
-
- override def newIterator(): KeyValueIterator[Array[Byte], Array[Byte]] = {
- throw new UnsupportedOperationException
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index b896810..f0965ae 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -194,11 +194,6 @@ class RocksDbKeyValueStore(
new RocksDbIterator(iter)
}
- def newIterator(): KeyValueIterator[Array[Byte], Array[Byte]] = {
- metrics.newIterator.inc
- new RocksDbIterator(db.newIterator);
- }
-
def flush {
metrics.flushes.inc
trace("Flushing.")
@@ -211,7 +206,7 @@ class RocksDbKeyValueStore(
}
class RocksDbIterator(iter: RocksIterator) extends KeyValueIterator[Array[Byte], Array[Byte]] {
- private var open = iter.isValid
+ private var open = true
private var firstValueAccessed = false;
def close() = {
open = false
@@ -253,18 +248,6 @@ class RocksDbKeyValueStore(
entry
}
- def seekToFirst: Unit = {
- metrics.alls.inc()
- iter.seekToFirst()
- open = true
- }
-
- def seek(target: Array[Byte]): Unit = {
- metrics.alls.inc()
- iter.seek(target)
- open = true
- }
-
override def finalize() {
if (open) {
trace("Leaked reference to RocksDB iterator, forcing close.")
http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 4c4e82e..c28f8db 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -124,10 +124,6 @@ class CachedStore[K, V](
}
override def hasNext: Boolean = iter.hasNext
-
- override def seekToFirst(): Unit = iter.seekToFirst()
-
- override def seek(key: K): Unit = iter.seek(key)
}
override def range(from: K, to: K): KeyValueIterator[K, V] = {
@@ -144,12 +140,6 @@ class CachedStore[K, V](
new CachedStoreIterator(store.all())
}
- override def newIterator(): KeyValueIterator[K, V] = {
- metrics.newIterator.inc
- flush()
- new CachedStoreIterator(store.newIterator())
- }
-
override def put(key: K, value: V) {
metrics.puts.inc
http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
index 04109c9..a11b5fe 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
@@ -35,7 +35,6 @@ class CachedStoreMetrics(
val deletes = newCounter("deletes")
val flushes = newCounter("flushes")
val putAllDirtyEntriesBatchSize = newCounter("put-all-dirty-entries-batch-size")
- val newIterator = newCounter("newitarator")
def setDirtyCount(getValue: () => Int) {
newGauge("dirty-count", getValue)
http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index defc91e..e5a66a4 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -79,11 +79,6 @@ class KeyValueStorageEngine[K, V](
wrapperStore.all()
}
- def newIterator() = {
- metrics.newIterator.inc
- wrapperStore.newIterator()
- }
-
/**
* Restore the contents of this key/value store from the change log,
* batching updates to underlying raw store to skip wrapping functions for efficiency.
http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
index f842b6f..233fba9 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
@@ -33,7 +33,6 @@ class KeyValueStorageEngineMetrics(
val puts = newCounter("puts")
val deletes = newCounter("deletes")
val flushes = newCounter("flushes")
- val newIterator = newCounter("newiterator")
val restoredMessages = newCounter("messages-restored")
val restoredBytes = newCounter("messages-bytes")
http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala
index c303261..967d509 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala
@@ -35,7 +35,6 @@ class KeyValueStoreMetrics(
val flushes = newCounter("flushes")
val bytesWritten = newCounter("bytes-written")
val bytesRead = newCounter("bytes-read")
- val newIterator = newCounter("newitertor")
override def getPrefix = storeName + "-"
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
index e293bfc..7bba6ff 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
@@ -56,11 +56,6 @@ class LoggedStore[K, V](
store.all()
}
- def newIterator() = {
- metrics.newIterator.inc
- store.newIterator()
- }
-
/**
* Perform the local update and log it out to the changelog
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala
index f856432..743151a 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala
@@ -33,7 +33,6 @@ class LoggedStoreMetrics(
val puts = newCounter("puts")
val deletes = newCounter("deletes")
val flushes = newCounter("flushes")
- val newIterator = newCounter("newiterator")
override def getPrefix = storeName + "-"
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
index 9d7baaa..3de257c 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
@@ -78,10 +78,6 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt
store.all
}
- def newIterator(): KeyValueIterator[K, V] = {
- store.newIterator()
- }
-
def flush {
store.flush
}
http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index cf1a2cc..8e183ef 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -97,11 +97,6 @@ class SerializedKeyValueStore[K, V](
new DeserializingIterator(store.all)
}
- def newIterator(): KeyValueIterator[K, V] = {
- metrics.newIterator.inc
- new DeserializingIterator(store.newIterator())
- }
-
private class DeserializingIterator(iter: KeyValueIterator[Array[Byte], Array[Byte]]) extends KeyValueIterator[K, V] {
def hasNext() = iter.hasNext()
def remove() = iter.remove()
@@ -112,10 +107,6 @@ class SerializedKeyValueStore[K, V](
val value = fromBytesOrNull(nxt.getValue, msgSerde)
new Entry(key, value)
}
-
- override def seekToFirst(): Unit = iter.seekToFirst()
-
- override def seek(key: K): Unit = iter.seek(toBytesOrNull(key, keySerde))
}
def flush {
http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
index bb745e0..841e4a2 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
@@ -35,7 +35,6 @@ class SerializedKeyValueStoreMetrics(
val flushes = newCounter("flushes")
val bytesSerialized = newCounter("bytes-serialized")
val bytesDeserialized = newCounter("bytes-deserialized")
- val newIterator = newCounter("newiterator")
override def getPrefix = storeName + "-"
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/373181c5/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
index e14a461..595dd0d 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
@@ -58,14 +58,6 @@ class MockKeyValueStore extends KeyValueStore[String, String] {
override def remove(): Unit = iter.remove()
override def close(): Unit = Unit
-
- override def seekToFirst(): Unit = {
- throw new UnsupportedOperationException
- }
-
- override def seek(key: String): Unit = {
- throw new UnsupportedOperationException
- }
}
override def range(from: String, to: String): KeyValueIterator[String, String] =
@@ -74,9 +66,6 @@ class MockKeyValueStore extends KeyValueStore[String, String] {
override def all(): KeyValueIterator[String, String] =
new MockIterator(kvMap.entrySet().iterator())
- override def newIterator(): KeyValueIterator[String, String] =
- throw new UnsupportedOperationException
-
override def flush() {} // no-op
override def close() { kvMap.clear() }