You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/07/29 08:00:59 UTC

[GitHub] [samza] bkonold commented on a change in pull request #1404: SAMZA-2562: [Scala cleanup] Clean up Scala in samza-kv-inmemory and samza-kv-couchbase

bkonold commented on a change in pull request #1404:
URL: https://github.com/apache/samza/pull/1404#discussion_r461230217



##########
File path: samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java
##########
@@ -19,66 +19,501 @@
 
 package org.apache.samza.storage.kv.inmemory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import com.google.common.collect.Iterators;
 import com.google.common.primitives.Ints;
-import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.SamzaException;
+import org.apache.samza.metrics.Counter;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueSnapshot;
 import org.apache.samza.storage.kv.KeyValueStoreMetrics;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
 
 public class TestInMemoryKeyValueStore {
+  private static final String DEFAULT_KEY_PREFIX = "key_prefix";
+  private static final String OTHER_KEY_PREFIX = "other_key_prefix";
+  /**
+   * Keep the lengths of the values longer so that metrics validations for key and value sizes don't collide.
+   */
+  private static final String DEFAULT_VALUE_PREFIX = "value_prefix_value_prefix";
+  private static final String OTHER_VALUE_PREFIX = "other_value_prefix_value_prefix";
+
+  @Mock
+  private KeyValueStoreMetrics keyValueStoreMetrics;
+  @Mock
+  private Counter getsCounter;
+  @Mock
+  private Counter bytesReadCounter;
+  @Mock
+  private Counter putsCounter;
+  @Mock
+  private Counter bytesWrittenCounter;
+  @Mock
+  private Counter deletesCounter;
+
+  private InMemoryKeyValueStore inMemoryKeyValueStore;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    when(this.keyValueStoreMetrics.gets()).thenReturn(this.getsCounter);
+    when(this.keyValueStoreMetrics.bytesRead()).thenReturn(this.bytesReadCounter);
+    when(this.keyValueStoreMetrics.puts()).thenReturn(this.putsCounter);
+    when(this.keyValueStoreMetrics.bytesWritten()).thenReturn(this.bytesWrittenCounter);
+    when(this.keyValueStoreMetrics.deletes()).thenReturn(this.deletesCounter);
+    this.inMemoryKeyValueStore = new InMemoryKeyValueStore(this.keyValueStoreMetrics);
+  }
+
+  @Test
+  public void testGet() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, 1), value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(0), this.inMemoryKeyValueStore.get(key(0)));
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), this.inMemoryKeyValueStore.get(key(OTHER_KEY_PREFIX, 1)));
+    verify(this.getsCounter, times(2)).inc();
+    verify(this.bytesReadCounter).inc(value(0).length);
+    verify(this.bytesReadCounter).inc(value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testGetEmpty() {
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.getsCounter).inc();
+    verifyZeroInteractions(this.bytesReadCounter);
+  }
+
+  @Test
+  public void testGetAfterDelete() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.delete(key(0));
+
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.getsCounter).inc();
+    verifyZeroInteractions(this.bytesReadCounter);
+  }
+
+  @Test
+  public void testPut() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, 1), value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(0), this.inMemoryKeyValueStore.get(key(0)));
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), this.inMemoryKeyValueStore.get(key(OTHER_KEY_PREFIX, 1)));
+    verify(this.putsCounter, times(2)).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(0).length);
+    verify(this.bytesWrittenCounter).inc(key(OTHER_KEY_PREFIX, 1).length + value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testPutExistingEntry() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(0), value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.putsCounter, times(2)).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(0).length);
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testPutEmpty() {
+    byte[] emptyValue = new byte[0];
+    this.inMemoryKeyValueStore.put(key(0), emptyValue);
+
+    assertEquals(0, this.inMemoryKeyValueStore.get(key(0)).length);
+    verify(this.putsCounter).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length);
+  }

Review comment:
       is there an edge case this is intended to test that `testPut` doesn't cover?

##########
File path: samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java
##########
@@ -19,66 +19,501 @@
 
 package org.apache.samza.storage.kv.inmemory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import com.google.common.collect.Iterators;
 import com.google.common.primitives.Ints;
-import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.SamzaException;
+import org.apache.samza.metrics.Counter;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueSnapshot;
 import org.apache.samza.storage.kv.KeyValueStoreMetrics;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
 
 public class TestInMemoryKeyValueStore {
+  private static final String DEFAULT_KEY_PREFIX = "key_prefix";
+  private static final String OTHER_KEY_PREFIX = "other_key_prefix";
+  /**
+   * Keep the lengths of the values longer so that metrics validations for key and value sizes don't collide.
+   */
+  private static final String DEFAULT_VALUE_PREFIX = "value_prefix_value_prefix";
+  private static final String OTHER_VALUE_PREFIX = "other_value_prefix_value_prefix";
+
+  @Mock
+  private KeyValueStoreMetrics keyValueStoreMetrics;
+  @Mock
+  private Counter getsCounter;
+  @Mock
+  private Counter bytesReadCounter;
+  @Mock
+  private Counter putsCounter;
+  @Mock
+  private Counter bytesWrittenCounter;
+  @Mock
+  private Counter deletesCounter;
+
+  private InMemoryKeyValueStore inMemoryKeyValueStore;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    when(this.keyValueStoreMetrics.gets()).thenReturn(this.getsCounter);
+    when(this.keyValueStoreMetrics.bytesRead()).thenReturn(this.bytesReadCounter);
+    when(this.keyValueStoreMetrics.puts()).thenReturn(this.putsCounter);
+    when(this.keyValueStoreMetrics.bytesWritten()).thenReturn(this.bytesWrittenCounter);
+    when(this.keyValueStoreMetrics.deletes()).thenReturn(this.deletesCounter);
+    this.inMemoryKeyValueStore = new InMemoryKeyValueStore(this.keyValueStoreMetrics);
+  }
+
+  @Test
+  public void testGet() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, 1), value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(0), this.inMemoryKeyValueStore.get(key(0)));
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), this.inMemoryKeyValueStore.get(key(OTHER_KEY_PREFIX, 1)));
+    verify(this.getsCounter, times(2)).inc();
+    verify(this.bytesReadCounter).inc(value(0).length);
+    verify(this.bytesReadCounter).inc(value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testGetEmpty() {
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.getsCounter).inc();
+    verifyZeroInteractions(this.bytesReadCounter);
+  }
+
+  @Test
+  public void testGetAfterDelete() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.delete(key(0));
+
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.getsCounter).inc();
+    verifyZeroInteractions(this.bytesReadCounter);
+  }
+
+  @Test
+  public void testPut() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, 1), value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(0), this.inMemoryKeyValueStore.get(key(0)));
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), this.inMemoryKeyValueStore.get(key(OTHER_KEY_PREFIX, 1)));
+    verify(this.putsCounter, times(2)).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(0).length);
+    verify(this.bytesWrittenCounter).inc(key(OTHER_KEY_PREFIX, 1).length + value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testPutExistingEntry() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(0), value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.putsCounter, times(2)).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(0).length);
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testPutEmpty() {
+    byte[] emptyValue = new byte[0];
+    this.inMemoryKeyValueStore.put(key(0), emptyValue);
+
+    assertEquals(0, this.inMemoryKeyValueStore.get(key(0)).length);
+    verify(this.putsCounter).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length);
+  }
+
+  @Test
+  public void testPutNull() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(0), null);
+
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.putsCounter, times(2)).inc();
+    verify(this.deletesCounter).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(0).length);
+  }
+
+  @Test
+  public void testPutAll() {
+    List<Entry<byte[], byte[]>> entries = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      entries.add(new Entry<>(key(i), value(i)));
+    }
+    this.inMemoryKeyValueStore.putAll(entries);
+
+    for (int i = 0; i < 10; i++) {
+      assertArrayEquals(value(i), this.inMemoryKeyValueStore.get(key(i)));
+    }
+    verify(this.putsCounter, times(10)).inc();
+    verify(this.bytesWrittenCounter, times(10)).inc(key(0).length + value(0).length);
+  }
+
+  @Test
+  public void testPutAllUpdate() {
+    // check that an existing value is overridden
+    this.inMemoryKeyValueStore.put(key(0), value(1234));
+    List<Entry<byte[], byte[]>> entries = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      entries.add(new Entry<>(key(i), value(i)));
+    }
+    this.inMemoryKeyValueStore.putAll(entries);
+
+    for (int i = 0; i < 10; i++) {
+      assertArrayEquals(value(i), this.inMemoryKeyValueStore.get(key(i)));
+    }
+    // 1 time for initial value to be overridden, 10 times for "regular" puts
+    verify(this.putsCounter, times(11)).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(1234).length);
+    verify(this.bytesWrittenCounter, times(10)).inc(key(0).length + value(0).length);

Review comment:
       doesn't this assume that `key(0)` and `value(0)` lengths are the same as `key(i)` and `value(i)` from the loop? can we re-write this so it more obviously matches the 10 writes that happen in the loop or add a comment indicating this assumption? or perhaps define a constant that represents the length of an integer key / value... just to make this more readable.
   
   same comment applies to other occurrences of loops in your tests

##########
File path: samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.inmemory;
+
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.UnsignedBytes;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueIterator;
+import org.apache.samza.storage.kv.KeyValueSnapshot;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.storage.kv.KeyValueStoreMetrics;
+
+
+/**
+ * In-memory implementation of a {@link KeyValueStore}.
+ *
+ * This uses a {@link ConcurrentSkipListMap} to store the keys in order.
+ */
+public class InMemoryKeyValueStore implements KeyValueStore<byte[], byte[]> {
+  private final KeyValueStoreMetrics metrics;
+  private final ConcurrentSkipListMap<byte[], byte[]> underlying;
+
+  /**
+   * @param metrics A metrics instance to publish key-value store related statistics
+   */
+  public InMemoryKeyValueStore(KeyValueStoreMetrics metrics) {
+    this.metrics = metrics;
+    this.underlying = new ConcurrentSkipListMap<>(UnsignedBytes.lexicographicalComparator());
+  }
+
+  @Override
+  public byte[] get(byte[] key) {
+    this.metrics.gets().inc();
+    Preconditions.checkArgument(key != null, "Null argument 'key' not allowed");
+    byte[] found = this.underlying.get(key);
+    if (found != null) {
+      metrics.bytesRead().inc(found.length);
+    }
+    return found;
+  }
+
+  @Override
+  public void put(byte[] key, byte[] value) {
+    this.metrics.puts().inc();
+    Preconditions.checkArgument(key != null, "Null argument 'key' not allowed");
+    if (value == null) {
+      this.metrics.deletes().inc();
+      this.underlying.remove(key);
+    } else {
+      this.metrics.bytesWritten().inc(key.length + value.length);
+      this.underlying.put(key, value);
+    }
+  }
+
+  @Override
+  public void putAll(List<Entry<byte[], byte[]>> entries) {
+    // TreeMap's putAll requires a map, so we'd need to iterate over all the entries anyway
+    // to use it, in order to putAll here.  Therefore, just iterate here.
+    for (Entry<byte[], byte[]> next : entries) {
+      put(next.getKey(), next.getValue());
+    }
+  }
+
+  @Override
+  public void delete(byte[] key) {
+    // TODO Bug: This double counts deletes for metrics, because put also counts a delete
+    metrics.deletes().inc();
+    put(key, null);
+  }
+
+  @Override
+  public KeyValueIterator<byte[], byte[]> range(byte[] from, byte[] to) {
+    this.metrics.ranges().inc();
+    Preconditions.checkArgument(from != null, "Null argument 'from' not allowed");
+    Preconditions.checkArgument(to != null, "Null argument 'to' not allowed");
+    return new InMemoryIterator(this.underlying.subMap(from, to).entrySet().iterator(), this.metrics);
+  }
+
+  @Override
+  public KeyValueSnapshot<byte[], byte[]> snapshot(byte[] from, byte[] to) {
+    // TODO: Bug: This does not satisfy the immutability constraint, since the entrySet is backed by the underlying map.

Review comment:
       link the jira?

##########
File path: samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java
##########
@@ -19,66 +19,501 @@
 
 package org.apache.samza.storage.kv.inmemory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import com.google.common.collect.Iterators;
 import com.google.common.primitives.Ints;
-import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.SamzaException;
+import org.apache.samza.metrics.Counter;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueSnapshot;
 import org.apache.samza.storage.kv.KeyValueStoreMetrics;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
 
 public class TestInMemoryKeyValueStore {
+  private static final String DEFAULT_KEY_PREFIX = "key_prefix";
+  private static final String OTHER_KEY_PREFIX = "other_key_prefix";
+  /**
+   * Keep the lengths of the values longer so that metrics validations for key and value sizes don't collide.
+   */
+  private static final String DEFAULT_VALUE_PREFIX = "value_prefix_value_prefix";
+  private static final String OTHER_VALUE_PREFIX = "other_value_prefix_value_prefix";
+
+  @Mock
+  private KeyValueStoreMetrics keyValueStoreMetrics;
+  @Mock
+  private Counter getsCounter;
+  @Mock
+  private Counter bytesReadCounter;
+  @Mock
+  private Counter putsCounter;
+  @Mock
+  private Counter bytesWrittenCounter;
+  @Mock
+  private Counter deletesCounter;
+
+  private InMemoryKeyValueStore inMemoryKeyValueStore;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    when(this.keyValueStoreMetrics.gets()).thenReturn(this.getsCounter);
+    when(this.keyValueStoreMetrics.bytesRead()).thenReturn(this.bytesReadCounter);
+    when(this.keyValueStoreMetrics.puts()).thenReturn(this.putsCounter);
+    when(this.keyValueStoreMetrics.bytesWritten()).thenReturn(this.bytesWrittenCounter);
+    when(this.keyValueStoreMetrics.deletes()).thenReturn(this.deletesCounter);
+    this.inMemoryKeyValueStore = new InMemoryKeyValueStore(this.keyValueStoreMetrics);
+  }
+
+  @Test
+  public void testGet() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, 1), value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(0), this.inMemoryKeyValueStore.get(key(0)));
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), this.inMemoryKeyValueStore.get(key(OTHER_KEY_PREFIX, 1)));
+    verify(this.getsCounter, times(2)).inc();
+    verify(this.bytesReadCounter).inc(value(0).length);
+    verify(this.bytesReadCounter).inc(value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testGetEmpty() {
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.getsCounter).inc();
+    verifyZeroInteractions(this.bytesReadCounter);
+  }
+
+  @Test
+  public void testGetAfterDelete() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.delete(key(0));
+
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.getsCounter).inc();
+    verifyZeroInteractions(this.bytesReadCounter);
+  }
+
+  @Test
+  public void testPut() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, 1), value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(0), this.inMemoryKeyValueStore.get(key(0)));
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), this.inMemoryKeyValueStore.get(key(OTHER_KEY_PREFIX, 1)));
+    verify(this.putsCounter, times(2)).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(0).length);
+    verify(this.bytesWrittenCounter).inc(key(OTHER_KEY_PREFIX, 1).length + value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testPutExistingEntry() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(0), value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.putsCounter, times(2)).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(0).length);
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testPutEmpty() {
+    byte[] emptyValue = new byte[0];
+    this.inMemoryKeyValueStore.put(key(0), emptyValue);
+
+    assertEquals(0, this.inMemoryKeyValueStore.get(key(0)).length);
+    verify(this.putsCounter).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length);
+  }
+
+  @Test
+  public void testPutNull() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(0), null);
+
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.putsCounter, times(2)).inc();
+    verify(this.deletesCounter).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(0).length);
+  }
+
+  @Test
+  public void testPutAll() {
+    List<Entry<byte[], byte[]>> entries = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      entries.add(new Entry<>(key(i), value(i)));
+    }
+    this.inMemoryKeyValueStore.putAll(entries);
+
+    for (int i = 0; i < 10; i++) {
+      assertArrayEquals(value(i), this.inMemoryKeyValueStore.get(key(i)));
+    }
+    verify(this.putsCounter, times(10)).inc();
+    verify(this.bytesWrittenCounter, times(10)).inc(key(0).length + value(0).length);
+  }
+
+  @Test
+  public void testPutAllUpdate() {
+    // check that an existing value is overridden
+    this.inMemoryKeyValueStore.put(key(0), value(1234));
+    List<Entry<byte[], byte[]>> entries = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      entries.add(new Entry<>(key(i), value(i)));
+    }
+    this.inMemoryKeyValueStore.putAll(entries);
+
+    for (int i = 0; i < 10; i++) {
+      assertArrayEquals(value(i), this.inMemoryKeyValueStore.get(key(i)));
+    }
+    // 1 time for initial value to be overridden, 10 times for "regular" puts
+    verify(this.putsCounter, times(11)).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(1234).length);
+    verify(this.bytesWrittenCounter, times(10)).inc(key(0).length + value(0).length);
+  }
+
+  @Test
+  public void testPutAllWithNull() {
+    List<Entry<byte[], byte[]>> entries = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      entries.add(new Entry<>(key(i), value(i)));
+    }
+    this.inMemoryKeyValueStore.putAll(entries);
+
+    List<Entry<byte[], byte[]>> deleteEntries = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      deleteEntries.add(new Entry<>(key(i), null));
+    }
+    this.inMemoryKeyValueStore.putAll(deleteEntries);
+
+    for (int i = 0; i < 10; i++) {
+      if (i < 3) {
+        assertNull(this.inMemoryKeyValueStore.get(key(i)));
+      } else {
+        assertArrayEquals(value(i), this.inMemoryKeyValueStore.get(key(i)));
+      }
+    }
+    // 10 times for "regular" puts, 3 times for deletion puts
+    verify(this.putsCounter, times(13)).inc();
+    // 10 "regular" puts all have same size for key/value
+    verify(this.bytesWrittenCounter, times(10)).inc(key(0).length + value(0).length);
+    verifyNoMoreInteractions(this.bytesWrittenCounter);
+    verify(this.deletesCounter, times(3)).inc();
+  }
+
+  @Test
+  public void testDelete() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.delete(key(0));
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+
+    /*
+     * There is a bug in which deletes are double counted in metrics. This deletesCounter should only be invoked once

Review comment:
       ty for noting the bug; will make it easy to fix when tests fail once the bug is fixed 

##########
File path: samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.inmemory;
+
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.UnsignedBytes;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueIterator;
+import org.apache.samza.storage.kv.KeyValueSnapshot;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.storage.kv.KeyValueStoreMetrics;
+
+
+/**
+ * In-memory implementation of a {@link KeyValueStore}.
+ *
+ * This uses a {@link ConcurrentSkipListMap} to store the keys in order.
+ */
+public class InMemoryKeyValueStore implements KeyValueStore<byte[], byte[]> {
+  private final KeyValueStoreMetrics metrics;
+  private final ConcurrentSkipListMap<byte[], byte[]> underlying;
+
+  /**
+   * @param metrics A metrics instance to publish key-value store related statistics
+   */
+  public InMemoryKeyValueStore(KeyValueStoreMetrics metrics) {
+    this.metrics = metrics;
+    this.underlying = new ConcurrentSkipListMap<>(UnsignedBytes.lexicographicalComparator());
+  }
+
+  @Override
+  public byte[] get(byte[] key) {
+    this.metrics.gets().inc();
+    Preconditions.checkArgument(key != null, "Null argument 'key' not allowed");
+    byte[] found = this.underlying.get(key);
+    if (found != null) {
+      metrics.bytesRead().inc(found.length);
+    }
+    return found;
+  }
+
+  @Override
+  public void put(byte[] key, byte[] value) {
+    this.metrics.puts().inc();
+    Preconditions.checkArgument(key != null, "Null argument 'key' not allowed");
+    if (value == null) {
+      this.metrics.deletes().inc();
+      this.underlying.remove(key);
+    } else {
+      this.metrics.bytesWritten().inc(key.length + value.length);
+      this.underlying.put(key, value);
+    }
+  }
+
+  @Override
+  public void putAll(List<Entry<byte[], byte[]>> entries) {
+    // TreeMap's putAll requires a map, so we'd need to iterate over all the entries anyway
+    // to use it, in order to putAll here.  Therefore, just iterate here.
+    for (Entry<byte[], byte[]> next : entries) {
+      put(next.getKey(), next.getValue());
+    }
+  }
+
+  @Override
+  public void delete(byte[] key) {
+    // TODO Bug: This double counts deletes for metrics, because put also counts a delete

Review comment:
       link the jira?

##########
File path: samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java
##########
@@ -19,66 +19,501 @@
 
 package org.apache.samza.storage.kv.inmemory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import com.google.common.collect.Iterators;
 import com.google.common.primitives.Ints;
-import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.SamzaException;
+import org.apache.samza.metrics.Counter;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueSnapshot;
 import org.apache.samza.storage.kv.KeyValueStoreMetrics;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
 
 public class TestInMemoryKeyValueStore {
+  private static final String DEFAULT_KEY_PREFIX = "key_prefix";
+  private static final String OTHER_KEY_PREFIX = "other_key_prefix";
+  /**
+   * Keep the lengths of the values longer so that metrics validations for key and value sizes don't collide.
+   */
+  private static final String DEFAULT_VALUE_PREFIX = "value_prefix_value_prefix";
+  private static final String OTHER_VALUE_PREFIX = "other_value_prefix_value_prefix";
+
+  @Mock
+  private KeyValueStoreMetrics keyValueStoreMetrics;
+  @Mock
+  private Counter getsCounter;
+  @Mock
+  private Counter bytesReadCounter;
+  @Mock
+  private Counter putsCounter;
+  @Mock
+  private Counter bytesWrittenCounter;
+  @Mock
+  private Counter deletesCounter;
+
+  private InMemoryKeyValueStore inMemoryKeyValueStore;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    when(this.keyValueStoreMetrics.gets()).thenReturn(this.getsCounter);
+    when(this.keyValueStoreMetrics.bytesRead()).thenReturn(this.bytesReadCounter);
+    when(this.keyValueStoreMetrics.puts()).thenReturn(this.putsCounter);
+    when(this.keyValueStoreMetrics.bytesWritten()).thenReturn(this.bytesWrittenCounter);
+    when(this.keyValueStoreMetrics.deletes()).thenReturn(this.deletesCounter);
+    this.inMemoryKeyValueStore = new InMemoryKeyValueStore(this.keyValueStoreMetrics);
+  }
+
+  @Test
+  public void testGet() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, 1), value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(0), this.inMemoryKeyValueStore.get(key(0)));
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), this.inMemoryKeyValueStore.get(key(OTHER_KEY_PREFIX, 1)));
+    verify(this.getsCounter, times(2)).inc();
+    verify(this.bytesReadCounter).inc(value(0).length);
+    verify(this.bytesReadCounter).inc(value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testGetEmpty() {
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.getsCounter).inc();
+    verifyZeroInteractions(this.bytesReadCounter);
+  }
+
+  @Test
+  public void testGetAfterDelete() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.delete(key(0));
+
+    assertNull(this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.getsCounter).inc();
+    verifyZeroInteractions(this.bytesReadCounter);
+  }
+
+  @Test
+  public void testPut() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(OTHER_KEY_PREFIX, 1), value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(0), this.inMemoryKeyValueStore.get(key(0)));
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), this.inMemoryKeyValueStore.get(key(OTHER_KEY_PREFIX, 1)));
+    verify(this.putsCounter, times(2)).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(0).length);
+    verify(this.bytesWrittenCounter).inc(key(OTHER_KEY_PREFIX, 1).length + value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testPutExistingEntry() {
+    this.inMemoryKeyValueStore.put(key(0), value(0));
+    this.inMemoryKeyValueStore.put(key(0), value(OTHER_VALUE_PREFIX, 1));
+
+    assertArrayEquals(value(OTHER_VALUE_PREFIX, 1), this.inMemoryKeyValueStore.get(key(0)));
+    verify(this.putsCounter, times(2)).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(0).length);
+    verify(this.bytesWrittenCounter).inc(key(0).length + value(OTHER_VALUE_PREFIX, 1).length);
+  }
+
+  @Test
+  public void testPutEmpty() {
+    byte[] emptyValue = new byte[0];
+    this.inMemoryKeyValueStore.put(key(0), emptyValue);
+
+    assertEquals(0, this.inMemoryKeyValueStore.get(key(0)).length);
+    verify(this.putsCounter).inc();
+    verify(this.bytesWrittenCounter).inc(key(0).length);
+  }

Review comment:
       Sorry should have been more clear. What code path is this testing that testPut isn't? Does the implementation behave differently when the length is 0?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org