You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/10/02 19:23:35 UTC

samza git commit: SAMZA-1423; Implement time series storage for joins and windows

Repository: samza
Updated Branches:
  refs/heads/master f16ba2692 -> 56d564c60


SAMZA-1423; Implement time series storage for joins and windows

Notable changes:
* New interface for storing and retrieving time series data.
* New store and serde implementation for use in windows and joins

Pending:
* Documentation, and minor clean-ups
* Wire-up of stores from ExecutionPlanner
* Usage of the store to implement various windows and joins

Author: Jagadish <ja...@apache.org>
Author: Jagadish <jv...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@linkedin.com>,Xinyu Liu<xi...@linkedin.com>

Closes #303 from vjagadish1989/window-store


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/56d564c6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/56d564c6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/56d564c6

Branch: refs/heads/master
Commit: 56d564c604c810ce7ea549e7dd4581588f8c47b5
Parents: f16ba26
Author: Jagadish <ja...@apache.org>
Authored: Mon Oct 2 12:23:24 2017 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Mon Oct 2 12:23:24 2017 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 .../samza/storage/kv/ClosableIterator.java      |  40 ++++
 .../operators/impl/store/TimeSeriesKey.java     |  80 +++++++
 .../impl/store/TimeSeriesKeySerde.java          |  96 +++++++++
 .../operators/impl/store/TimeSeriesStore.java   |  80 +++++++
 .../impl/store/TimeSeriesStoreImpl.java         | 195 +++++++++++++++++
 .../operators/impl/store/TimestampedValue.java  |  61 ++++++
 .../impl/store/TestTimeSeriesKeySerde.java      |  71 +++++++
 .../impl/store/TestTimeSeriesStoreImpl.java     | 210 +++++++++++++++++++
 9 files changed, 834 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 16091ae..b10241a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -178,6 +178,7 @@ project(":samza-core_$scalaVersion") {
     compile "org.scala-lang:scala-library:$scalaLibVersion"
     compile "org.slf4j:slf4j-api:$slf4jVersion"
     testCompile project(":samza-api").sourceSets.test.output
+    testCompile project(":samza-kv-rocksdb_$scalaVersion")
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
     testCompile "org.powermock:powermock-api-mockito:$powerMockVersion"

http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-api/src/main/java/org/apache/samza/storage/kv/ClosableIterator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/ClosableIterator.java b/samza-api/src/main/java/org/apache/samza/storage/kv/ClosableIterator.java
new file mode 100644
index 0000000..cc215e6
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/ClosableIterator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import java.util.Iterator;
+
+/**
+ * An iterator that must be closed.
+ *
+ * <p>
+ * Implement close to free resources assigned to the iterator such as open file handles, persistent state etc.
+ *
+ * @param <V> the type of value returned by this iterator
+ */
+public interface ClosableIterator<V> extends Iterator<V> {
+
+  /**
+   * Closes this iterator and frees resources assigned to it.
+   *
+   * It is illegal to invoke {@link #next()} and {@link #hasNext()} after an iterator has been closed.
+   */
+  public void close();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java
new file mode 100644
index 0000000..4ed73aa
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.samza.operators.impl.store;
+
+/**
+ * The store key used in the {@link TimeSeriesStore} to uniquely identify a row.
+ */
+public class TimeSeriesKey<K> {
+
+  // version for backwards compatibility
+  private static final byte VERSION = 0x00;
+  private final K key;
+  private final long timestamp;
+
+  private final long seqNum;
+
+  public TimeSeriesKey(K k, long time, long seq) {
+    key = k;
+    timestamp = time;
+    seqNum = seq;
+  }
+
+  public K getKey() {
+    return key;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public byte getVersion() {
+    return VERSION;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    TimeSeriesKey<?> that = (TimeSeriesKey<?>) o;
+
+    if (timestamp != that.timestamp) return false;
+    if (seqNum != that.seqNum) return false;
+    return key != null ? key.equals(that.key) : that.key == null;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = key != null ? key.hashCode() : 0;
+    result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+    result = 31 * result + (int) (seqNum ^ (seqNum >>> 32));
+    return result;
+  }
+
+  public long getSeqNum() {
+    return seqNum;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("TimeSeriesKey {key: %s timestamp: %s seqNum: %s}", key, timestamp, seqNum);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java
new file mode 100644
index 0000000..273c40a
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.samza.operators.impl.store;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.serializers.Serde;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A {@link Serde} for {@link TimeSeriesKey}s.
+ *
+ * <p>
+ * This wraps the actual key's serde with serializers for timestamp, version number and sequence number.
+ *
+ * A {@link TimeSeriesKeySerde} serializes a key as follows:
+ *    +-------------------------+------------------+----------------+------------------+
+ *    |  serialized-key bytes   |  timestamp       | version (0)    | seqNum           |
+ *    |(serialized by keySerde) |                  |                |                  |
+ *    +-------------------------+------------------+----------------+------------------+
+ *    +---serialized key len----+-------8 bytes----+---1 byte-------+---7 bytes---------+
+ *
+ * @param <K> the type of the wrapped key
+ */
+public class TimeSeriesKeySerde<K> implements Serde<TimeSeriesKey<K>> {
+
+  private static final long SEQUENCE_NUM_MASK = 0x00ffffffffffffffL;
+  private static final int TIMESTAMP_SIZE = 8;
+  private static final int SEQNUM_SIZE = 8;
+
+  private final Serde<K> keySerde;
+
+  public TimeSeriesKeySerde(Serde<K> keySerde) {
+    this.keySerde = keySerde;
+  }
+
+  @Override
+  public byte[] toBytes(TimeSeriesKey<K> timeSeriesKey) {
+    K key = timeSeriesKey.getKey();
+    long timestamp = timeSeriesKey.getTimestamp();
+    long seqNum = timeSeriesKey.getSeqNum();
+
+    byte[] serializedKey = keySerde.toBytes(key);
+    int keySize = serializedKey == null ? 0 : serializedKey.length;
+
+    // append the timestamp and sequence number to the serialized key bytes
+    ByteBuffer buf = ByteBuffer.allocate(keySize + TIMESTAMP_SIZE + SEQNUM_SIZE);
+    if (serializedKey != null) {
+      buf.put(serializedKey);
+    }
+    buf.putLong(timestamp);
+    buf.putLong(seqNum & SEQUENCE_NUM_MASK);
+
+    return buf.array();
+  }
+
+  @Override
+  public TimeSeriesKey<K> fromBytes(byte[] timeSeriesKeyBytes) {
+    // First obtain the key bytes, and deserialize them. Later de-serialize the timestamp and sequence number
+    ByteBuffer buf = ByteBuffer.wrap(timeSeriesKeyBytes);
+    int keySize =  timeSeriesKeyBytes.length - TIMESTAMP_SIZE - SEQNUM_SIZE;
+    K key = null;
+
+    if (keySize != 0) {
+      byte[] keyBytes = new byte[keySize];
+      buf.get(keyBytes);
+      key = keySerde.fromBytes(keyBytes);
+    }
+
+    long timeStamp = buf.getLong();
+    long seqNum = buf.getLong();
+    long version = seqNum & ~SEQUENCE_NUM_MASK;
+
+    if (version != 0) {
+      throw new SamzaException("Version is not zero. Sequence number: " + seqNum);
+    }
+    return new TimeSeriesKey(key, timeStamp, seqNum);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
new file mode 100644
index 0000000..e544e2e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.operators.impl.store;
+
+import org.apache.samza.storage.kv.ClosableIterator;
+
+/**
+ * A key-value store that allows entries to be queried and stored based on time ranges.
+ *
+ * Operations on the store can be invoked from multiple threads. Hence, implementations are expected to be thread-safe.
+ *
+ * @param <K> the type of key in the store
+ * @param <V> the type of value in the store
+ */
+public interface TimeSeriesStore<K, V> {
+
+  /**
+   * Insert a key and the value in the store with the provided timestamp.
+   *
+   * @param key the key to insert
+   * @param val the value to insert
+   * @param timestamp the timestamp in milliseconds
+   */
+  void put(K key, V val, long timestamp);
+
+  /**
+   * Returns an iterator over values for the given key in the provided time-range - [{@code startTimestamp}, {@code endTimestamp})
+   *
+   * Values returned by the iterator are ordered by their timestamp. Values with the same timestamp are
+   * returned in their order of insertion.
+   *
+   * <p> The iterator <b>must</b> be closed after use by calling {@link #close}. Not doing so will result in memory leaks.
+   *
+   * @param key the key to look up in the store
+   * @param startTimestamp the start timestamp of the range, inclusive
+   * @param endTimestamp the end timestamp of the range, exclusive
+   * @throws IllegalArgumentException when startTimeStamp &gt; endTimestamp, or when either of them is negative
+   */
+  ClosableIterator<TimestampedValue<V>> get(K key, long startTimestamp, long endTimestamp);
+
+  /**
+   * Removes all values for this key in the given time-range.
+   *
+   * @param key the key to look up in the store
+   * @param startTimestamp the start timestamp of the range, inclusive
+   * @param endTimeStamp the end timestamp of the range, exclusive
+   * @throws IllegalArgumentException when startTimeStamp &gt; endTimeStamp, or when either of them is negative
+   */
+  void remove(K key, long startTimestamp, long endTimeStamp);
+
+  /**
+   * Flushes this time series store, if applicable.
+   */
+  void flush();
+
+  /**
+   * Closes this store.
+   *
+   * Use this to perform final clean-ups, release acquired resources etc.
+   */
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
new file mode 100644
index 0000000..5e35219
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.samza.operators.impl.store;
+
+import org.apache.samza.storage.kv.ClosableIterator;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueIterator;
+import org.apache.samza.storage.kv.KeyValueStore;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Provides a view on top of a {@link KeyValueStore} that allows retrieval of entries by time ranges.
+ *
+ * <p>
+ * A {@link TimeSeriesStoreImpl} can be backed by persistent stores like rocksDB, in-memory stores, change-logged
+ * stores, cached stores (or any combination of these).
+ *
+ * <p>
+ * Range iterators in the store return values in the order of their timestamp. Within the same key and timestamp,
+ * values are returned in their order of insertion.
+ *
+ * <p>
+ * This store has two modes of operation depending on how duplicates are handled:
+ * <ol>
+ *   <li>
+ *     Overwrite Mode: In this mode, the store only retains the most recent value for a given key and timestamp. I.e.,Calling
+ *     {@link #put} on an existing key and timestamp will overwrite the previously stored value for that key.
+ *   </li>
+ *   <li>
+ *     Append Mode: In this mode, the store retains all previous values for a given key and timestamp. I.e., Calling {@link #put}
+ *     with an existing key and timestamp will append the value to the list.
+ *   </li>
+ * </ol>
+ * <p>
+ * Implementation Notes:
+ *
+ * Data is serialized and organized into K-V pairs as follows:
+ *  <pre>
+ *    +-----------------------+------------------+------------+------------------------+-----------------------+
+ *    |  serialized-key bytes |  timestamp       | version    |       sequence number  | serialized value      |
+ *    |                       |                  |            |                        |                       |
+ *    +-----------------------+------------------+------------+------------------------+-----------------------+
+ *    +----------------------+--------8 bytes----+----1 bytes-+---------7 bytes--------+----value size----------
+ *    +----------------------------------STORE KEY-------------------------------------+---STORE VAL-----------+
+ *  </pre>
+ * An 8 byte timestamp, a one byte version and a 7 byte sequence number are appended to the provided key and this
+ * combination is used as the key in the k-v store. The provided value is stored as is.
+ *
+ * <p> This class is thread-safe and concurrent reads/writes are expected.
+ *
+ * @param <K> the type of key in the store
+ * @param <V> the type of value in the store
+ */
+public class TimeSeriesStoreImpl<K, V> implements TimeSeriesStore<K, V> {
+
+  private final KeyValueStore<TimeSeriesKey<K>, V> kvStore;
+
+  /**
+   * Since timestamps are at the granularity of milliseconds, multiple entries added in the same
+   * millisecond are distinguished by a monotonically increasing sequence number.
+   */
+  private final AtomicLong seqNum = new AtomicLong();
+  private final boolean appendMode;
+
+  /**
+   * Creates a {@link TimeSeriesStoreImpl}
+   *
+   * @param kvStore the backing kv store to use
+   * @param appendMode should the store be used in appendMode
+   */
+  public TimeSeriesStoreImpl(KeyValueStore<TimeSeriesKey<K>, V> kvStore, boolean appendMode) {
+    this.kvStore = kvStore;
+    this.appendMode = appendMode;
+  }
+
+  /**
+   * Creates a {@link TimeSeriesStoreImpl} in append mode.
+   *
+   * @param kvStore the backing kv store to use
+   */
+  public TimeSeriesStoreImpl(KeyValueStore<TimeSeriesKey<K>, V> kvStore) {
+    this(kvStore, true);
+  }
+
+  @Override
+  public void put(K key, V val, long timestamp) {
+    // For append mode, values are differentiated by an unique sequence number. For overwrite mode, the sequence
+    // number is always zero. This ensures that only the most recent value is retained.
+    if (appendMode) {
+      seqNum.getAndIncrement();
+    }
+    TimeSeriesKey<K> timeSeriesKey = new TimeSeriesKey<>(key, timestamp, seqNum.get());
+    kvStore.put(timeSeriesKey, val);
+  }
+
+  @Override
+  public ClosableIterator<TimestampedValue<V>> get(K key, long startTimestamp, long endTimestamp) {
+    validateRange(startTimestamp, endTimestamp);
+    TimeSeriesKey<K> fromKey = new TimeSeriesKey(key, startTimestamp, 0);
+    TimeSeriesKey<K> toKey = new TimeSeriesKey(key, endTimestamp, 0);
+
+    KeyValueIterator<TimeSeriesKey<K>, V> range = kvStore.range(fromKey, toKey);
+    return new TimeSeriesStoreIterator<>(range);
+  }
+
+  @Override
+  public void remove(K key, long startTimestamp, long endTimeStamp) {
+    validateRange(startTimestamp, endTimeStamp);
+    TimeSeriesKey<K> fromKey = new TimeSeriesKey(key, startTimestamp, 0);
+    TimeSeriesKey<K> toKey = new TimeSeriesKey(key, endTimeStamp, 0);
+
+    List<TimeSeriesKey<K>> keysToDelete = new LinkedList<>();
+
+    KeyValueIterator<TimeSeriesKey<K>, V> range = kvStore.range(fromKey, toKey);
+    while (range.hasNext()) {
+      keysToDelete.add(range.next().getKey());
+    }
+
+    kvStore.deleteAll(keysToDelete);
+  }
+
+  @Override
+  public void flush() {
+    kvStore.flush();
+  }
+
+  @Override
+  public void close() {
+    kvStore.close();
+  }
+
+  private void validateRange(long startTimestamp, long endTimestamp) throws IllegalArgumentException {
+    if (startTimestamp < 0) {
+      throw new IllegalArgumentException(String.format("Start timestamp :%d is less than zero", startTimestamp));
+    }
+
+    if (endTimestamp < 0) {
+      throw new IllegalArgumentException(String.format("End timestamp :%d is less than zero", endTimestamp));
+    }
+
+    if (endTimestamp < startTimestamp) {
+      throw new IllegalArgumentException(String.format("End timestamp :%d is less than start timestamp: %d", endTimestamp, startTimestamp));
+    }
+  }
+
+  private static class TimeSeriesStoreIterator<K, V> implements ClosableIterator<TimestampedValue<V>> {
+
+    private final KeyValueIterator<TimeSeriesKey<K>, V> wrappedIterator;
+
+    public TimeSeriesStoreIterator(KeyValueIterator<TimeSeriesKey<K>, V> wrappedIterator) {
+      this.wrappedIterator = wrappedIterator;
+    }
+
+    @Override
+    public void close() {
+      wrappedIterator.close();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return wrappedIterator.hasNext();
+    }
+
+    @Override
+    public TimestampedValue<V> next() {
+      Entry<TimeSeriesKey<K>, V> next = wrappedIterator.next();
+      return new TimestampedValue<>(next.getValue(), next.getKey().getTimestamp());
+    }
+
+    @Override
+    public void remove() {
+      wrappedIterator.remove();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
new file mode 100644
index 0000000..ad5e844
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.samza.operators.impl.store;
+
+/**
+ * An immutable pair of a value, and its corresponding timestamp.
+ *
+ * @param <V> the type of the value
+ */
+public class TimestampedValue<V> {
+  private final V value;
+  private final Long timestamp;
+
+  public TimestampedValue(V v, Long time) {
+    value = v;
+    timestamp = time;
+  }
+
+  public V getValue() {
+    return value;
+  }
+
+  public Long getTimestamp() {
+    return timestamp;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || !getClass().equals(o.getClass())) return false;
+
+    TimestampedValue<?> that = (TimestampedValue<?>) o;
+
+    if (value != null ? !value.equals(that.value) : that.value != null) return false;
+    return timestamp.equals(that.timestamp);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = value != null ? value.hashCode() : 0;
+    result = 31 * result + timestamp.hashCode();
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesKeySerde.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesKeySerde.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesKeySerde.java
new file mode 100644
index 0000000..3c1df9c
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesKeySerde.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.samza.operators.impl.store;
+
+import org.apache.samza.serializers.LongSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+
+public class TestTimeSeriesKeySerde {
+
+  @Test
+  public void testStringTimeSeriesKey() {
+    TimeSeriesKey<String> storeKey = new TimeSeriesKey<>("test", 1, 23);
+    TimeSeriesKeySerde<String> serde = new TimeSeriesKeySerde<>(new StringSerde("UTF-8"));
+
+    byte[] serializedBytes = serde.toBytes(storeKey);
+    TimeSeriesKey<String> deserializedTimeSeriesKey = serde.fromBytes(serializedBytes);
+
+    assertEquals(storeKey.getKey(), deserializedTimeSeriesKey.getKey());
+    assertEquals(storeKey.getSeqNum(), deserializedTimeSeriesKey.getSeqNum());
+    assertEquals(storeKey.getTimestamp(), deserializedTimeSeriesKey.getTimestamp());
+    assertEquals(storeKey, deserializedTimeSeriesKey);
+  }
+
+  @Test
+  public void testNullTimeSeriesKey() {
+    TimeSeriesKey<String> storeKey = new TimeSeriesKey<>(null, 1, 23);
+    TimeSeriesKeySerde<String> serde = new TimeSeriesKeySerde<>(new StringSerde("UTF-8"));
+    byte[] serializedBytes = serde.toBytes(storeKey);
+    TimeSeriesKey<String> deserializedTimeSeriesKey = serde.fromBytes(serializedBytes);
+
+    assertEquals(storeKey.getKey(), deserializedTimeSeriesKey.getKey());
+    assertEquals(storeKey.getSeqNum(), deserializedTimeSeriesKey.getSeqNum());
+    assertEquals(storeKey.getTimestamp(), deserializedTimeSeriesKey.getTimestamp());
+
+    assertEquals(storeKey, deserializedTimeSeriesKey);
+  }
+
+  @Test
+  public void testLongTimeSeriesKey() {
+    TimeSeriesKey<Long> storeKey = new TimeSeriesKey<>(30L, 1, 23);
+    TimeSeriesKeySerde<Long> serde = new TimeSeriesKeySerde<>(new LongSerde());
+    byte[] serializedBytes = serde.toBytes(storeKey);
+    TimeSeriesKey<Long> deserializedTimeSeriesKey = serde.fromBytes(serializedBytes);
+
+    assertEquals(storeKey.getKey(), deserializedTimeSeriesKey.getKey());
+    assertEquals(storeKey.getSeqNum(), deserializedTimeSeriesKey.getSeqNum());
+    assertEquals(storeKey.getTimestamp(), deserializedTimeSeriesKey.getTimestamp());
+
+    assertEquals(storeKey, deserializedTimeSeriesKey);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
new file mode 100644
index 0000000..62304f3
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.samza.operators.impl.store;
+
+import com.google.common.io.Files;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.serializers.ByteSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.storage.kv.ClosableIterator;
+import org.apache.samza.storage.kv.KeyValueStoreMetrics;
+import org.apache.samza.storage.kv.RocksDbKeyValueStore;
+import org.apache.samza.storage.kv.SerializedKeyValueStore;
+import org.apache.samza.storage.kv.SerializedKeyValueStoreMetrics;
+import org.junit.Assert;
+import org.junit.Test;
+import org.rocksdb.CompressionType;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestTimeSeriesStoreImpl {
+
+  @Test
+  public void testGetOnTimestampBoundaries() {
+    TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore("my-store", new StringSerde("UTF-8"), true);
+
+    // insert an entry with key "hello" at timestamps "1" and "2"
+    timeSeriesStore.put("hello", "world-1".getBytes(), 1L);
+    timeSeriesStore.put("hello", "world-1".getBytes(), 2L);
+    timeSeriesStore.put("hello", "world-2".getBytes(), 2L);
+
+    // read from time-range
+    List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 0L, 1L);
+    Assert.assertEquals(values.size(), 0);
+
+    // read from time-range [1,2) should return one entry
+    values = readStore(timeSeriesStore, "hello", 1L, 2L);
+    Assert.assertEquals(values.size(), 1);
+    Assert.assertEquals(new String(values.get(0).getValue()), "world-1");
+
+    // read from time-range [2,3) should return two entries
+    values = readStore(timeSeriesStore, "hello", 2L, 3L);
+    Assert.assertEquals(values.size(), 2);
+    Assert.assertEquals(new String(values.get(0).getValue()), "world-1");
+    Assert.assertEquals(values.get(0).getTimestamp(), new Long(2));
+
+    // read from time-range [0,3) should return three entries
+    values = readStore(timeSeriesStore, "hello", 0L, 3L);
+    Assert.assertEquals(values.size(), 3);
+
+    // read from time-range [2,999999) should return two entries
+    values = readStore(timeSeriesStore, "hello", 2L, 999999L);
+    Assert.assertEquals(values.size(), 2);
+
+    // read from time-range [3,4) should return no entries
+    values = readStore(timeSeriesStore, "hello", 3L, 4L);
+    Assert.assertEquals(values.size(), 0);
+  }
+
+  @Test
+  public void testGetWithNonExistentKeys() {
+    TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore("my-store", new StringSerde("UTF-8"), true);
+    timeSeriesStore.put("hello", "world-1".getBytes(), 1L);
+
+    // read from a non-existent key
+    List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "non-existent-key", 0, Integer.MAX_VALUE);
+    Assert.assertEquals(values.size(), 0);
+
+    // read from an existing key but out of range timestamp
+    values = readStore(timeSeriesStore, "hello", 2, Integer.MAX_VALUE);
+    Assert.assertEquals(values.size(), 0);
+  }
+
+  @Test
+  public void testPutWithMultipleEntries() {
+    TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore("my-store", new StringSerde("UTF-8"), true);
+
+    // insert 100 entries at timestamps "1" and "2"
+    for (int i = 0; i < 100; i++) {
+      timeSeriesStore.put("hello", "world-1".getBytes(), 1L);
+      timeSeriesStore.put("hello", "world-2".getBytes(), 2L);
+    }
+
+    // read from time-range [0,2) should return 100 entries
+    List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 0L, 2L);
+    Assert.assertEquals(values.size(), 100);
+    values.forEach(timeSeriesValue -> {
+        Assert.assertEquals(new String(timeSeriesValue.getValue()), "world-1");
+      });
+
+    // read from time-range [2,4) should return 100 entries
+    values = readStore(timeSeriesStore, "hello", 2L, 4L);
+    Assert.assertEquals(values.size(), 100);
+    values.forEach(timeSeriesValue -> {
+        Assert.assertEquals(new String(timeSeriesValue.getValue()), "world-2");
+      });
+
+    // read all entries in the store
+    values = readStore(timeSeriesStore, "hello", 0L, Integer.MAX_VALUE);
+    Assert.assertEquals(values.size(), 200);
+  }
+
+  @Test
+  public void testGetOnTimestampBoundariesWithOverwriteMode() {
+    // instantiate a store in overwrite mode
+    TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore("my-store", new StringSerde("UTF-8"), false);
+
+    // insert an entry with key "hello" at timestamps "1" and "2"
+    timeSeriesStore.put("hello", "world-1".getBytes(), 1L);
+    timeSeriesStore.put("hello", "world-1".getBytes(), 2L);
+    timeSeriesStore.put("hello", "world-2".getBytes(), 2L);
+
+    // read from time-range
+    List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 0L, 1L);
+    Assert.assertEquals(values.size(), 0);
+
+    // read from time-range [1,2) should return one entry
+    values = readStore(timeSeriesStore, "hello", 1L, 2L);
+    Assert.assertEquals(values.size(), 1);
+    Assert.assertEquals(new String(values.get(0).getValue()), "world-1");
+
+    // read from time-range [2,3) should return the most recent entry
+    values = readStore(timeSeriesStore, "hello", 2L, 3L);
+    Assert.assertEquals(values.size(), 1);
+    Assert.assertEquals(new String(values.get(0).getValue()), "world-2");
+    Assert.assertEquals(values.get(0).getTimestamp(), new Long(2));
+
+    // read from time-range [0,3) should return two entries
+    values = readStore(timeSeriesStore, "hello", 0L, 3L);
+    Assert.assertEquals(values.size(), 2);
+
+    // read from time-range [2,999999) should return one entry
+    values = readStore(timeSeriesStore, "hello", 2L, 999999L);
+    Assert.assertEquals(values.size(), 1);
+
+    // read from time-range [3,4) should return no entries
+    values = readStore(timeSeriesStore, "hello", 3L, 4L);
+    Assert.assertEquals(values.size(), 0);
+  }
+
+  @Test
+  public void testDeletesInOverwriteMode() {
+    // instantiate a store in overwrite mode
+    TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore("my-store", new StringSerde("UTF-8"), false);
+
+    // insert an entry with key "hello" at timestamps "1" and "2"
+    timeSeriesStore.put("hello", "world-1".getBytes(), 1L);
+    timeSeriesStore.put("hello", "world-1".getBytes(), 2L);
+    timeSeriesStore.put("hello", "world-2".getBytes(), 2L);
+
+    List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 1L, 3L);
+    Assert.assertEquals(values.size(), 2);
+
+    timeSeriesStore.remove("hello", 0L, 3L);
+    values = readStore(timeSeriesStore, "hello", 1L, 3L);
+    Assert.assertEquals(values.size(), 0);
+  }
+
+  private static <K, V> List<TimestampedValue<V>> readStore(TimeSeriesStore<K, V> store, K key, long startTimestamp, long endTimestamp) {
+    List<TimestampedValue<V>> list = new ArrayList<>();
+    ClosableIterator<TimestampedValue<V>> storeValuesIterator = store.get(key, startTimestamp, endTimestamp);
+
+    while (storeValuesIterator.hasNext()) {
+      TimestampedValue<V> next = storeValuesIterator.next();
+      list.add(next);
+    }
+
+    storeValuesIterator.close();
+    return list;
+  }
+
+  private static <K> TimeSeriesStore<K, byte[]> newTimeSeriesStore(String storeName, Serde<K> keySerde, boolean appendMode) {
+    RocksDbKeyValueStore rocksKVStore = newRocksDbStore("someStore");
+    SerializedKeyValueStore<TimeSeriesKey<K>, byte[]> kvStore = new SerializedKeyValueStore<>(rocksKVStore,
+            new TimeSeriesKeySerde<>(keySerde), new ByteSerde(),
+            new SerializedKeyValueStoreMetrics("", new MetricsRegistryMap()));
+    return new TimeSeriesStoreImpl<>(kvStore, appendMode);
+  }
+
+  private static RocksDbKeyValueStore newRocksDbStore(String storeName) {
+    File dir = Files.createTempDir();
+    return new RocksDbKeyValueStore(dir,
+            new Options().setCreateIfMissing(true).setCompressionType(CompressionType.SNAPPY_COMPRESSION), new MapConfig(),
+            false, storeName, new WriteOptions(), new FlushOptions(),
+            new KeyValueStoreMetrics(storeName, new MetricsRegistryMap()));
+  }
+}