You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/10/02 19:23:35 UTC
samza git commit: SAMZA-1423;
Implement time series storage for joins and windows
Repository: samza
Updated Branches:
refs/heads/master f16ba2692 -> 56d564c60
SAMZA-1423; Implement time series storage for joins and windows
Notable changes:
* New interface for storing and retrieving time series data.
* New store and serde implementation for use in windows and joins
Pending:
* Documentation, and minor clean-ups
* Wire-up of stores from ExecutionPlanner
* Usage of the store to implement various windows and joins
Author: Jagadish <ja...@apache.org>
Author: Jagadish <jv...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@linkedin.com>,Xinyu Liu<xi...@linkedin.com>
Closes #303 from vjagadish1989/window-store
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/56d564c6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/56d564c6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/56d564c6
Branch: refs/heads/master
Commit: 56d564c604c810ce7ea549e7dd4581588f8c47b5
Parents: f16ba26
Author: Jagadish <ja...@apache.org>
Authored: Mon Oct 2 12:23:24 2017 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Mon Oct 2 12:23:24 2017 -0700
----------------------------------------------------------------------
build.gradle | 1 +
.../samza/storage/kv/ClosableIterator.java | 40 ++++
.../operators/impl/store/TimeSeriesKey.java | 80 +++++++
.../impl/store/TimeSeriesKeySerde.java | 96 +++++++++
.../operators/impl/store/TimeSeriesStore.java | 80 +++++++
.../impl/store/TimeSeriesStoreImpl.java | 195 +++++++++++++++++
.../operators/impl/store/TimestampedValue.java | 61 ++++++
.../impl/store/TestTimeSeriesKeySerde.java | 71 +++++++
.../impl/store/TestTimeSeriesStoreImpl.java | 210 +++++++++++++++++++
9 files changed, 834 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 16091ae..b10241a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -178,6 +178,7 @@ project(":samza-core_$scalaVersion") {
compile "org.scala-lang:scala-library:$scalaLibVersion"
compile "org.slf4j:slf4j-api:$slf4jVersion"
testCompile project(":samza-api").sourceSets.test.output
+ testCompile project(":samza-kv-rocksdb_$scalaVersion")
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
testCompile "org.powermock:powermock-api-mockito:$powerMockVersion"
http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-api/src/main/java/org/apache/samza/storage/kv/ClosableIterator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/ClosableIterator.java b/samza-api/src/main/java/org/apache/samza/storage/kv/ClosableIterator.java
new file mode 100644
index 0000000..cc215e6
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/ClosableIterator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import java.util.Iterator;
+
+/**
+ * An iterator that must be closed.
+ *
+ * <p>
+ * Implement close to free resources assigned to the iterator such as open file handles, persistent state etc.
+ *
+ * @param <V> the type of value returned by this iterator
+ */
+public interface ClosableIterator<V> extends Iterator<V> {
+
+ /**
+ * Closes this iterator and frees resources assigned to it.
+ *
+ * It is illegal to invoke {@link #next()} and {@link #hasNext()} after an iterator has been closed.
+ */
+ public void close();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java
new file mode 100644
index 0000000..4ed73aa
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.samza.operators.impl.store;
+
+/**
+ * The store key used in the {@link TimeSeriesStore} to uniquely identify a row.
+ */
+public class TimeSeriesKey<K> {
+
+ // version for backwards compatibility
+ private static final byte VERSION = 0x00;
+ private final K key;
+ private final long timestamp;
+
+ private final long seqNum;
+
+ public TimeSeriesKey(K k, long time, long seq) {
+ key = k;
+ timestamp = time;
+ seqNum = seq;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public byte getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TimeSeriesKey<?> that = (TimeSeriesKey<?>) o;
+
+ if (timestamp != that.timestamp) return false;
+ if (seqNum != that.seqNum) return false;
+ return key != null ? key.equals(that.key) : that.key == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = key != null ? key.hashCode() : 0;
+ result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+ result = 31 * result + (int) (seqNum ^ (seqNum >>> 32));
+ return result;
+ }
+
+ public long getSeqNum() {
+ return seqNum;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("TimeSeriesKey {key: %s timestamp: %s seqNum: %s}", key, timestamp, seqNum);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java
new file mode 100644
index 0000000..273c40a
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.samza.operators.impl.store;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.serializers.Serde;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A {@link Serde} for {@link TimeSeriesKey}s.
+ *
+ * <p>
+ * This wraps the actual key's serde with serializers for timestamp, version number and sequence number.
+ *
+ * A {@link TimeSeriesKeySerde} serializes a key as follows:
+ * +-------------------------+------------------+----------------+------------------+
+ * | serialized-key bytes | timestamp | version (0) | seqNum |
+ * |(serialized by keySerde) | | | |
+ * +-------------------------+------------------+----------------+------------------+
+ * +---serialized key len----+-------8 bytes----+---1 byte-------+---7 bytes---------+
+ *
+ * @param <K> the type of the wrapped key
+ */
+public class TimeSeriesKeySerde<K> implements Serde<TimeSeriesKey<K>> {
+
+ private static final long SEQUENCE_NUM_MASK = 0x00ffffffffffffffL;
+ private static final int TIMESTAMP_SIZE = 8;
+ private static final int SEQNUM_SIZE = 8;
+
+ private final Serde<K> keySerde;
+
+ public TimeSeriesKeySerde(Serde<K> keySerde) {
+ this.keySerde = keySerde;
+ }
+
+ @Override
+ public byte[] toBytes(TimeSeriesKey<K> timeSeriesKey) {
+ K key = timeSeriesKey.getKey();
+ long timestamp = timeSeriesKey.getTimestamp();
+ long seqNum = timeSeriesKey.getSeqNum();
+
+ byte[] serializedKey = keySerde.toBytes(key);
+ int keySize = serializedKey == null ? 0 : serializedKey.length;
+
+ // append the timestamp and sequence number to the serialized key bytes
+ ByteBuffer buf = ByteBuffer.allocate(keySize + TIMESTAMP_SIZE + SEQNUM_SIZE);
+ if (serializedKey != null) {
+ buf.put(serializedKey);
+ }
+ buf.putLong(timestamp);
+ buf.putLong(seqNum & SEQUENCE_NUM_MASK);
+
+ return buf.array();
+ }
+
+ @Override
+ public TimeSeriesKey<K> fromBytes(byte[] timeSeriesKeyBytes) {
+ // First obtain the key bytes, and deserialize them. Later de-serialize the timestamp and sequence number
+ ByteBuffer buf = ByteBuffer.wrap(timeSeriesKeyBytes);
+ int keySize = timeSeriesKeyBytes.length - TIMESTAMP_SIZE - SEQNUM_SIZE;
+ K key = null;
+
+ if (keySize != 0) {
+ byte[] keyBytes = new byte[keySize];
+ buf.get(keyBytes);
+ key = keySerde.fromBytes(keyBytes);
+ }
+
+ long timeStamp = buf.getLong();
+ long seqNum = buf.getLong();
+ long version = seqNum & ~SEQUENCE_NUM_MASK;
+
+ if (version != 0) {
+ throw new SamzaException("Version is not zero. Sequence number: " + seqNum);
+ }
+ return new TimeSeriesKey(key, timeStamp, seqNum);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
new file mode 100644
index 0000000..e544e2e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.operators.impl.store;
+
+import org.apache.samza.storage.kv.ClosableIterator;
+
+/**
+ * A key-value store that allows entries to be queried and stored based on time ranges.
+ *
+ * Operations on the store can be invoked from multiple threads. Hence, implementations are expected to be thread-safe.
+ *
+ * @param <K> the type of key in the store
+ * @param <V> the type of value in the store
+ */
+public interface TimeSeriesStore<K, V> {
+
+ /**
+ * Insert a key and the value in the store with the provided timestamp.
+ *
+ * @param key the key to insert
+ * @param val the value to insert
+ * @param timestamp the timestamp in milliseconds
+ */
+ void put(K key, V val, long timestamp);
+
+ /**
+ * Returns an iterator over values for the given key in the provided time-range - [{@code startTimestamp}, {@code endTimestamp})
+ *
+ * Values returned by the iterator are ordered by their timestamp. Values with the same timestamp are
+ * returned in their order of insertion.
+ *
+ * <p> The iterator <b>must</b> be closed after use by calling {@link #close}. Not doing so will result in memory leaks.
+ *
+ * @param key the key to look up in the store
+ * @param startTimestamp the start timestamp of the range, inclusive
+ * @param endTimestamp the end timestamp of the range, exclusive
+ * @throws IllegalArgumentException when startTimeStamp > endTimestamp, or when either of them is negative
+ */
+ ClosableIterator<TimestampedValue<V>> get(K key, long startTimestamp, long endTimestamp);
+
+ /**
+ * Removes all values for this key in the given time-range.
+ *
+ * @param key the key to look up in the store
+ * @param startTimestamp the start timestamp of the range, inclusive
+ * @param endTimeStamp the end timestamp of the range, exclusive
+ * @throws IllegalArgumentException when startTimeStamp > endTimeStamp, or when either of them is negative
+ */
+ void remove(K key, long startTimestamp, long endTimeStamp);
+
+ /**
+ * Flushes this time series store, if applicable.
+ */
+ void flush();
+
+ /**
+ * Closes this store.
+ *
+ * Use this to perform final clean-ups, release acquired resources etc.
+ */
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
new file mode 100644
index 0000000..5e35219
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.samza.operators.impl.store;
+
+import org.apache.samza.storage.kv.ClosableIterator;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueIterator;
+import org.apache.samza.storage.kv.KeyValueStore;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Provides a view on top of a {@link KeyValueStore} that allows retrieval of entries by time ranges.
+ *
+ * <p>
+ * A {@link TimeSeriesStoreImpl} can be backed by persistent stores like rocksDB, in-memory stores, change-logged
+ * stores, cached stores (or any combination of these).
+ *
+ * <p>
+ * Range iterators in the store return values in the order of their timestamp. Within the same key and timestamp,
+ * values are returned in their order of insertion.
+ *
+ * <p>
+ * This store has two modes of operation depending on how duplicates are handled:
+ * <ol>
+ * <li>
+ * Overwrite Mode: In this mode, the store only retains the most recent value for a given key and timestamp. I.e.,Calling
+ * {@link #put} on an existing key and timestamp will overwrite the previously stored value for that key.
+ * </li>
+ * <li>
+ * Append Mode: In this mode, the store retains all previous values for a given key and timestamp. I.e., Calling {@link #put}
+ * with an existing key and timestamp will append the value to the list.
+ * </li>
+ * </ol>
+ * <p>
+ * Implementation Notes:
+ *
+ * Data is serialized and organized into K-V pairs as follows:
+ * <pre>
+ * +-----------------------+------------------+------------+------------------------+-----------------------+
+ * | serialized-key bytes | timestamp | version | sequence number | serialized value |
+ * | | | | | |
+ * +-----------------------+------------------+------------+------------------------+-----------------------+
+ * +----------------------+--------8 bytes----+----1 bytes-+---------7 bytes--------+----value size----------
+ * +----------------------------------STORE KEY-------------------------------------+---STORE VAL-----------+
+ * </pre>
+ * An 8 byte timestamp, a one byte version and a 7 byte sequence number are appended to the provided key and this
+ * combination is used as the key in the k-v store. The provided value is stored as is.
+ *
+ * <p> This class is thread-safe and concurrent reads/writes are expected.
+ *
+ * @param <K> the type of key in the store
+ * @param <V> the type of value in the store
+ */
+public class TimeSeriesStoreImpl<K, V> implements TimeSeriesStore<K, V> {
+
+ private final KeyValueStore<TimeSeriesKey<K>, V> kvStore;
+
+ /**
+ * Since timestamps are at the granularity of milliseconds, multiple entries added in the same
+ * millisecond are distinguished by a monotonically increasing sequence number.
+ */
+ private final AtomicLong seqNum = new AtomicLong();
+ private final boolean appendMode;
+
+ /**
+ * Creates a {@link TimeSeriesStoreImpl}
+ *
+ * @param kvStore the backing kv store to use
+ * @param appendMode should the store be used in appendMode
+ */
+ public TimeSeriesStoreImpl(KeyValueStore<TimeSeriesKey<K>, V> kvStore, boolean appendMode) {
+ this.kvStore = kvStore;
+ this.appendMode = appendMode;
+ }
+
+ /**
+ * Creates a {@link TimeSeriesStoreImpl} in append mode.
+ *
+ * @param kvStore the backing kv store to use
+ */
+ public TimeSeriesStoreImpl(KeyValueStore<TimeSeriesKey<K>, V> kvStore) {
+ this(kvStore, true);
+ }
+
+ @Override
+ public void put(K key, V val, long timestamp) {
+ // For append mode, values are differentiated by an unique sequence number. For overwrite mode, the sequence
+ // number is always zero. This ensures that only the most recent value is retained.
+ if (appendMode) {
+ seqNum.getAndIncrement();
+ }
+ TimeSeriesKey<K> timeSeriesKey = new TimeSeriesKey<>(key, timestamp, seqNum.get());
+ kvStore.put(timeSeriesKey, val);
+ }
+
+ @Override
+ public ClosableIterator<TimestampedValue<V>> get(K key, long startTimestamp, long endTimestamp) {
+ validateRange(startTimestamp, endTimestamp);
+ TimeSeriesKey<K> fromKey = new TimeSeriesKey(key, startTimestamp, 0);
+ TimeSeriesKey<K> toKey = new TimeSeriesKey(key, endTimestamp, 0);
+
+ KeyValueIterator<TimeSeriesKey<K>, V> range = kvStore.range(fromKey, toKey);
+ return new TimeSeriesStoreIterator<>(range);
+ }
+
+ @Override
+ public void remove(K key, long startTimestamp, long endTimeStamp) {
+ validateRange(startTimestamp, endTimeStamp);
+ TimeSeriesKey<K> fromKey = new TimeSeriesKey(key, startTimestamp, 0);
+ TimeSeriesKey<K> toKey = new TimeSeriesKey(key, endTimeStamp, 0);
+
+ List<TimeSeriesKey<K>> keysToDelete = new LinkedList<>();
+
+ KeyValueIterator<TimeSeriesKey<K>, V> range = kvStore.range(fromKey, toKey);
+ while (range.hasNext()) {
+ keysToDelete.add(range.next().getKey());
+ }
+
+ kvStore.deleteAll(keysToDelete);
+ }
+
+ @Override
+ public void flush() {
+ kvStore.flush();
+ }
+
+ @Override
+ public void close() {
+ kvStore.close();
+ }
+
+ private void validateRange(long startTimestamp, long endTimestamp) throws IllegalArgumentException {
+ if (startTimestamp < 0) {
+ throw new IllegalArgumentException(String.format("Start timestamp :%d is less than zero", startTimestamp));
+ }
+
+ if (endTimestamp < 0) {
+ throw new IllegalArgumentException(String.format("End timestamp :%d is less than zero", endTimestamp));
+ }
+
+ if (endTimestamp < startTimestamp) {
+ throw new IllegalArgumentException(String.format("End timestamp :%d is less than start timestamp: %d", endTimestamp, startTimestamp));
+ }
+ }
+
+ private static class TimeSeriesStoreIterator<K, V> implements ClosableIterator<TimestampedValue<V>> {
+
+ private final KeyValueIterator<TimeSeriesKey<K>, V> wrappedIterator;
+
+ public TimeSeriesStoreIterator(KeyValueIterator<TimeSeriesKey<K>, V> wrappedIterator) {
+ this.wrappedIterator = wrappedIterator;
+ }
+
+ @Override
+ public void close() {
+ wrappedIterator.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return wrappedIterator.hasNext();
+ }
+
+ @Override
+ public TimestampedValue<V> next() {
+ Entry<TimeSeriesKey<K>, V> next = wrappedIterator.next();
+ return new TimestampedValue<>(next.getValue(), next.getKey().getTimestamp());
+ }
+
+ @Override
+ public void remove() {
+ wrappedIterator.remove();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
new file mode 100644
index 0000000..ad5e844
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.samza.operators.impl.store;
+
+/**
+ * An immutable pair of a value, and its corresponding timestamp.
+ *
+ * @param <V> the type of the value
+ */
+public class TimestampedValue<V> {
+ private final V value;
+ private final Long timestamp;
+
+ public TimestampedValue(V v, Long time) {
+ value = v;
+ timestamp = time;
+ }
+
+ public V getValue() {
+ return value;
+ }
+
+ public Long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || !getClass().equals(o.getClass())) return false;
+
+ TimestampedValue<?> that = (TimestampedValue<?>) o;
+
+ if (value != null ? !value.equals(that.value) : that.value != null) return false;
+ return timestamp.equals(that.timestamp);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = value != null ? value.hashCode() : 0;
+ result = 31 * result + timestamp.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesKeySerde.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesKeySerde.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesKeySerde.java
new file mode 100644
index 0000000..3c1df9c
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesKeySerde.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.samza.operators.impl.store;
+
+import org.apache.samza.serializers.LongSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+
+public class TestTimeSeriesKeySerde {
+
+ @Test
+ public void testStringTimeSeriesKey() {
+ TimeSeriesKey<String> storeKey = new TimeSeriesKey<>("test", 1, 23);
+ TimeSeriesKeySerde<String> serde = new TimeSeriesKeySerde<>(new StringSerde("UTF-8"));
+
+ byte[] serializedBytes = serde.toBytes(storeKey);
+ TimeSeriesKey<String> deserializedTimeSeriesKey = serde.fromBytes(serializedBytes);
+
+ assertEquals(storeKey.getKey(), deserializedTimeSeriesKey.getKey());
+ assertEquals(storeKey.getSeqNum(), deserializedTimeSeriesKey.getSeqNum());
+ assertEquals(storeKey.getTimestamp(), deserializedTimeSeriesKey.getTimestamp());
+ assertEquals(storeKey, deserializedTimeSeriesKey);
+ }
+
+ @Test
+ public void testNullTimeSeriesKey() {
+ TimeSeriesKey<String> storeKey = new TimeSeriesKey<>(null, 1, 23);
+ TimeSeriesKeySerde<String> serde = new TimeSeriesKeySerde<>(new StringSerde("UTF-8"));
+ byte[] serializedBytes = serde.toBytes(storeKey);
+ TimeSeriesKey<String> deserializedTimeSeriesKey = serde.fromBytes(serializedBytes);
+
+ assertEquals(storeKey.getKey(), deserializedTimeSeriesKey.getKey());
+ assertEquals(storeKey.getSeqNum(), deserializedTimeSeriesKey.getSeqNum());
+ assertEquals(storeKey.getTimestamp(), deserializedTimeSeriesKey.getTimestamp());
+
+ assertEquals(storeKey, deserializedTimeSeriesKey);
+ }
+
+ @Test
+ public void testLongTimeSeriesKey() {
+ TimeSeriesKey<Long> storeKey = new TimeSeriesKey<>(30L, 1, 23);
+ TimeSeriesKeySerde<Long> serde = new TimeSeriesKeySerde<>(new LongSerde());
+ byte[] serializedBytes = serde.toBytes(storeKey);
+ TimeSeriesKey<Long> deserializedTimeSeriesKey = serde.fromBytes(serializedBytes);
+
+ assertEquals(storeKey.getKey(), deserializedTimeSeriesKey.getKey());
+ assertEquals(storeKey.getSeqNum(), deserializedTimeSeriesKey.getSeqNum());
+ assertEquals(storeKey.getTimestamp(), deserializedTimeSeriesKey.getTimestamp());
+
+ assertEquals(storeKey, deserializedTimeSeriesKey);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/56d564c6/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
new file mode 100644
index 0000000..62304f3
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.samza.operators.impl.store;
+
+import com.google.common.io.Files;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.serializers.ByteSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.storage.kv.ClosableIterator;
+import org.apache.samza.storage.kv.KeyValueStoreMetrics;
+import org.apache.samza.storage.kv.RocksDbKeyValueStore;
+import org.apache.samza.storage.kv.SerializedKeyValueStore;
+import org.apache.samza.storage.kv.SerializedKeyValueStoreMetrics;
+import org.junit.Assert;
+import org.junit.Test;
+import org.rocksdb.CompressionType;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestTimeSeriesStoreImpl {
+
+ @Test
+ public void testGetOnTimestampBoundaries() {
+ TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore("my-store", new StringSerde("UTF-8"), true);
+
+ // insert an entry with key "hello" at timestamps "1" and "2"
+ timeSeriesStore.put("hello", "world-1".getBytes(), 1L);
+ timeSeriesStore.put("hello", "world-1".getBytes(), 2L);
+ timeSeriesStore.put("hello", "world-2".getBytes(), 2L);
+
+ // read from time-range
+ List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 0L, 1L);
+ Assert.assertEquals(values.size(), 0);
+
+ // read from time-range [1,2) should return one entry
+ values = readStore(timeSeriesStore, "hello", 1L, 2L);
+ Assert.assertEquals(values.size(), 1);
+ Assert.assertEquals(new String(values.get(0).getValue()), "world-1");
+
+ // read from time-range [2,3) should return two entries
+ values = readStore(timeSeriesStore, "hello", 2L, 3L);
+ Assert.assertEquals(values.size(), 2);
+ Assert.assertEquals(new String(values.get(0).getValue()), "world-1");
+ Assert.assertEquals(values.get(0).getTimestamp(), new Long(2));
+
+ // read from time-range [0,3) should return three entries
+ values = readStore(timeSeriesStore, "hello", 0L, 3L);
+ Assert.assertEquals(values.size(), 3);
+
+ // read from time-range [2,999999) should return two entries
+ values = readStore(timeSeriesStore, "hello", 2L, 999999L);
+ Assert.assertEquals(values.size(), 2);
+
+ // read from time-range [3,4) should return no entries
+ values = readStore(timeSeriesStore, "hello", 3L, 4L);
+ Assert.assertEquals(values.size(), 0);
+ }
+
+ @Test
+ public void testGetWithNonExistentKeys() {
+ TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore("my-store", new StringSerde("UTF-8"), true);
+ timeSeriesStore.put("hello", "world-1".getBytes(), 1L);
+
+ // read from a non-existent key
+ List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "non-existent-key", 0, Integer.MAX_VALUE);
+ Assert.assertEquals(values.size(), 0);
+
+ // read from an existing key but out of range timestamp
+ values = readStore(timeSeriesStore, "hello", 2, Integer.MAX_VALUE);
+ Assert.assertEquals(values.size(), 0);
+ }
+
+ @Test
+ public void testPutWithMultipleEntries() {
+ TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore("my-store", new StringSerde("UTF-8"), true);
+
+ // insert 100 entries at timestamps "1" and "2"
+ for (int i = 0; i < 100; i++) {
+ timeSeriesStore.put("hello", "world-1".getBytes(), 1L);
+ timeSeriesStore.put("hello", "world-2".getBytes(), 2L);
+ }
+
+ // read from time-range [0,2) should return 100 entries
+ List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 0L, 2L);
+ Assert.assertEquals(values.size(), 100);
+ values.forEach(timeSeriesValue -> {
+ Assert.assertEquals(new String(timeSeriesValue.getValue()), "world-1");
+ });
+
+ // read from time-range [2,4) should return 100 entries
+ values = readStore(timeSeriesStore, "hello", 2L, 4L);
+ Assert.assertEquals(values.size(), 100);
+ values.forEach(timeSeriesValue -> {
+ Assert.assertEquals(new String(timeSeriesValue.getValue()), "world-2");
+ });
+
+ // read all entries in the store
+ values = readStore(timeSeriesStore, "hello", 0L, Integer.MAX_VALUE);
+ Assert.assertEquals(values.size(), 200);
+ }
+
+ @Test
+ public void testGetOnTimestampBoundariesWithOverwriteMode() {
+ // instantiate a store in overwrite mode
+ TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore("my-store", new StringSerde("UTF-8"), false);
+
+ // insert an entry with key "hello" at timestamps "1" and "2"
+ timeSeriesStore.put("hello", "world-1".getBytes(), 1L);
+ timeSeriesStore.put("hello", "world-1".getBytes(), 2L);
+ timeSeriesStore.put("hello", "world-2".getBytes(), 2L);
+
+ // read from time-range
+ List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 0L, 1L);
+ Assert.assertEquals(values.size(), 0);
+
+ // read from time-range [1,2) should return one entry
+ values = readStore(timeSeriesStore, "hello", 1L, 2L);
+ Assert.assertEquals(values.size(), 1);
+ Assert.assertEquals(new String(values.get(0).getValue()), "world-1");
+
+ // read from time-range [2,3) should return the most recent entry
+ values = readStore(timeSeriesStore, "hello", 2L, 3L);
+ Assert.assertEquals(values.size(), 1);
+ Assert.assertEquals(new String(values.get(0).getValue()), "world-2");
+ Assert.assertEquals(values.get(0).getTimestamp(), new Long(2));
+
+ // read from time-range [0,3) should return two entries
+ values = readStore(timeSeriesStore, "hello", 0L, 3L);
+ Assert.assertEquals(values.size(), 2);
+
+ // read from time-range [2,999999) should return one entry
+ values = readStore(timeSeriesStore, "hello", 2L, 999999L);
+ Assert.assertEquals(values.size(), 1);
+
+ // read from time-range [3,4) should return no entries
+ values = readStore(timeSeriesStore, "hello", 3L, 4L);
+ Assert.assertEquals(values.size(), 0);
+ }
+
+ @Test
+ public void testDeletesInOverwriteMode() {
+ // instantiate a store in overwrite mode
+ TimeSeriesStore<String, byte[]> timeSeriesStore = newTimeSeriesStore("my-store", new StringSerde("UTF-8"), false);
+
+ // insert an entry with key "hello" at timestamps "1" and "2"
+ timeSeriesStore.put("hello", "world-1".getBytes(), 1L);
+ timeSeriesStore.put("hello", "world-1".getBytes(), 2L);
+ timeSeriesStore.put("hello", "world-2".getBytes(), 2L);
+
+ List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 1L, 3L);
+ Assert.assertEquals(values.size(), 2);
+
+ timeSeriesStore.remove("hello", 0L, 3L);
+ values = readStore(timeSeriesStore, "hello", 1L, 3L);
+ Assert.assertEquals(values.size(), 0);
+ }
+
+ private static <K, V> List<TimestampedValue<V>> readStore(TimeSeriesStore<K, V> store, K key, long startTimestamp, long endTimestamp) {
+ List<TimestampedValue<V>> list = new ArrayList<>();
+ ClosableIterator<TimestampedValue<V>> storeValuesIterator = store.get(key, startTimestamp, endTimestamp);
+
+ while (storeValuesIterator.hasNext()) {
+ TimestampedValue<V> next = storeValuesIterator.next();
+ list.add(next);
+ }
+
+ storeValuesIterator.close();
+ return list;
+ }
+
+ private static <K> TimeSeriesStore<K, byte[]> newTimeSeriesStore(String storeName, Serde<K> keySerde, boolean appendMode) {
+ RocksDbKeyValueStore rocksKVStore = newRocksDbStore("someStore");
+ SerializedKeyValueStore<TimeSeriesKey<K>, byte[]> kvStore = new SerializedKeyValueStore<>(rocksKVStore,
+ new TimeSeriesKeySerde<>(keySerde), new ByteSerde(),
+ new SerializedKeyValueStoreMetrics("", new MetricsRegistryMap()));
+ return new TimeSeriesStoreImpl<>(kvStore, appendMode);
+ }
+
+ private static RocksDbKeyValueStore newRocksDbStore(String storeName) {
+ File dir = Files.createTempDir();
+ return new RocksDbKeyValueStore(dir,
+ new Options().setCreateIfMissing(true).setCompressionType(CompressionType.SNAPPY_COMPRESSION), new MapConfig(),
+ false, storeName, new WriteOptions(), new FlushOptions(),
+ new KeyValueStoreMetrics(storeName, new MetricsRegistryMap()));
+ }
+}