You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2016/04/01 18:52:35 UTC

[1/6] accumulo git commit: ACCUMULO-4148 Native map doesn't increment counter for every cell, potentially losing some updates

Repository: accumulo
Updated Branches:
  refs/heads/1.6 0206d7845 -> 41e002d7a
  refs/heads/1.7 2b286ba56 -> f181cf6a9
  refs/heads/master 02450e4f6 -> 0dd1d6a51


ACCUMULO-4148 Native map doesn't increment counter for every cell, potentially losing some updates

The problem here is that InMemoryMap$DefaultMap increments the mutationCount or
kvCount for every key value pair in every Mutation that is passed in.  The
NativeMap, which is used by the InMemoryMap$NativeMapWrapper does not.  This
causes 2 different issues in the NativeMap.

1)  When a single Mutation has duplicate key value pairs, only the last is
recorded, because they all have the same mutationCount and the earlier ones are
hidden.
2 ) When multiple Mutations are passed in at the same time, the mutationCount
or kvCount starts over for each Mutation.  This can also lead to hiding of key
value pairs.

The tests added here expose both the issues as well as do some asserts on
simple Mutations.  A few tweaks were made to expose information to these tests.

1) Made MemKey public, made it's kvCount private and exposed that via a getter so
we can inspect directly instead of parsing the toString.  Required changing
some calls to kvCount to use the getter.

2) Added a final String to the InMemoryMap which is set during construction.  This
allows you to see what kind of SimpleMap was setup in the InMemoryMap.

Closes apache/accumulo#82

Signed-off-by: Josh Elser <el...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/41e002d7
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/41e002d7
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/41e002d7

Branch: refs/heads/1.6
Commit: 41e002d7a07415ccd2826b2db98783187b9422b3
Parents: 0206d78
Author: Michael Wall <mj...@gmail.com>
Authored: Thu Feb 25 21:20:15 2016 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Fri Apr 1 09:46:19 2016 -0400

----------------------------------------------------------------------
 .../apache/accumulo/tserver/InMemoryMap.java    |  32 +-
 .../org/apache/accumulo/tserver/MemKey.java     |  10 +-
 .../org/apache/accumulo/tserver/NativeMap.java  |  25 +-
 .../org/apache/accumulo/test/InMemoryMapIT.java | 319 +++++++++++++++++++
 4 files changed, 361 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/41e002d7/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
index 57c9440..614b34a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
@@ -82,7 +82,7 @@ class MemKeyComparator implements Comparator<Key>, Serializable {
     if (cmp == 0) {
       if (k1 instanceof MemKey)
         if (k2 instanceof MemKey)
-          cmp = ((MemKey) k2).kvCount - ((MemKey) k1).kvCount;
+          cmp = ((MemKey) k2).getKVCount() - ((MemKey) k1).getKVCount();
         else
           cmp = 1;
       else if (k2 instanceof MemKey)
@@ -104,7 +104,7 @@ class PartialMutationSkippingIterator extends SkippingIterator implements Interr
 
   @Override
   protected void consume() throws IOException {
-    while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).kvCount > kvCount)
+    while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).getKVCount() > kvCount)
       getSource().next();
   }
 
@@ -191,9 +191,15 @@ public class InMemoryMap {
 
   private volatile String memDumpFile = null;
   private final String memDumpDir;
+  private final String mapType;
 
   private Map<String,Set<ByteSequence>> lggroups;
 
+  public static final String TYPE_NATIVE_MAP_WRAPPER = "NativeMapWrapper";
+  public static final String TYPE_DEFAULT_MAP = "DefaultMap";
+  public static final String TYPE_LOCALITY_GROUP_MAP = "LocalityGroupMap";
+  public static final String TYPE_LOCALITY_GROUP_MAP_NATIVE = "LocalityGroupMap with native";
+
   public InMemoryMap(boolean useNativeMap, String memDumpDir) {
     this(new HashMap<String,Set<ByteSequence>>(), useNativeMap, memDumpDir);
   }
@@ -202,10 +208,26 @@ public class InMemoryMap {
     this.memDumpDir = memDumpDir;
     this.lggroups = lggroups;
 
-    if (lggroups.size() == 0)
+    if (lggroups.size() == 0) {
       map = newMap(useNativeMap);
-    else
+      mapType = useNativeMap ? TYPE_NATIVE_MAP_WRAPPER : TYPE_DEFAULT_MAP;
+    } else {
       map = new LocalityGroupMap(lggroups, useNativeMap);
+      mapType = useNativeMap ? TYPE_LOCALITY_GROUP_MAP : TYPE_LOCALITY_GROUP_MAP_NATIVE;
+    }
+  }
+
+  /**
+   * Description of the type of SimpleMap that is created.
+   * <p>
+   * If no locality groups are present, the SimpleMap is either TYPE_DEFAULT_MAP or TYPE_NATIVE_MAP_WRAPPER. If there is one more locality groups, then the
+   * InMemoryMap has an array for simple maps that either contain either TYPE_LOCALITY_GROUP_MAP which contains DefaultMaps or TYPE_LOCALITY_GROUP_MAP_NATIVE
+   * which contains NativeMapWrappers.
+   *
+   * @return String that describes the Map type
+   */
+  public String getMapType() {
+    return mapType;
   }
 
   public InMemoryMap(AccumuloConfiguration config) throws LocalityGroupConfigurationError {
@@ -772,7 +794,7 @@ public class InMemoryMap {
     while (iter.hasTop() && activeIters.size() > 0) {
       // RFile does not support MemKey, so we move the kv count into the value only for the RFile.
       // There is no need to change the MemKey to a normal key because the kvCount info gets lost when it is written
-      Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount);
+      Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).getKVCount());
       out.append(iter.getTopKey(), newValue);
       iter.next();
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/41e002d7/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java
index 443ffb2..f3cdb21 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java
@@ -16,15 +16,17 @@
  */
 package org.apache.accumulo.tserver;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.accumulo.core.data.Key;
 
-class MemKey extends Key {
+@VisibleForTesting
+public class MemKey extends Key {
 
-  int kvCount;
+  private int kvCount;
 
   public MemKey(byte[] row, byte[] cf, byte[] cq, byte[] cv, long ts, boolean del, boolean copy, int mc) {
     super(row, cf, cq, cv, ts, del, copy);
@@ -45,6 +47,10 @@ class MemKey extends Key {
     return super.toString() + " mc=" + kvCount;
   }
 
+  public int getKVCount() {
+    return this.kvCount;
+  }
+
   @Override
   public Object clone() throws CloneNotSupportedException {
     return super.clone();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/41e002d7/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
index f728a9b..7e1435e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -519,36 +520,25 @@ public class NativeMap implements Iterable<Map.Entry<Key,Value>> {
     }
   }
 
-  private void _mutate(Mutation mutation, int mutationCount) {
+  private int _mutate(Mutation mutation, int mutationCount) {
 
     List<ColumnUpdate> updates = mutation.getUpdates();
     if (updates.size() == 1) {
       ColumnUpdate update = updates.get(0);
       singleUpdate(nmPointer, mutation.getRow(), update.getColumnFamily(), update.getColumnQualifier(), update.getColumnVisibility(), update.getTimestamp(),
-          update.isDeleted(), update.getValue(), mutationCount);
+          update.isDeleted(), update.getValue(), mutationCount++);
     } else if (updates.size() > 1) {
       long uid = startUpdate(nmPointer, mutation.getRow());
       for (ColumnUpdate update : updates) {
         update(nmPointer, uid, update.getColumnFamily(), update.getColumnQualifier(), update.getColumnVisibility(), update.getTimestamp(), update.isDeleted(),
-            update.getValue(), mutationCount);
+            update.getValue(), mutationCount++);
       }
-
     }
+    return mutationCount;
   }
 
   public void mutate(Mutation mutation, int mutationCount) {
-    wlock.lock();
-    try {
-      if (nmPointer == 0) {
-        throw new IllegalStateException("Native Map Deleted");
-      }
-
-      modCount++;
-
-      _mutate(mutation, mutationCount);
-    } finally {
-      wlock.unlock();
-    }
+    mutate(Collections.singletonList(mutation), mutationCount);
   }
 
   public void mutate(List<Mutation> mutations, int mutationCount) {
@@ -567,8 +557,7 @@ public class NativeMap implements Iterable<Map.Entry<Key,Value>> {
         int count = 0;
         while (iter.hasNext() && count < 10) {
           Mutation mutation = iter.next();
-          _mutate(mutation, mutationCount);
-          mutationCount++;
+          mutationCount = _mutate(mutation, mutationCount);
           count += mutation.size();
         }
       } finally {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/41e002d7/test/src/test/java/org/apache/accumulo/test/InMemoryMapIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/InMemoryMapIT.java b/test/src/test/java/org/apache/accumulo/test/InMemoryMapIT.java
new file mode 100644
index 0000000..102762a
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/InMemoryMapIT.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import com.google.common.collect.ImmutableSet;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.test.functional.NativeMapIT;
+import org.apache.accumulo.tserver.InMemoryMap;
+import org.apache.accumulo.tserver.MemKey;
+import org.apache.accumulo.tserver.NativeMap;
+import org.apache.hadoop.io.Text;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration Test for https://issues.apache.org/jira/browse/ACCUMULO-4148
+ * <p>
+ * User had problem writing one Mutation with multiple KV pairs that had the same key. Doing so should write out all pairs in all mutations with a unique id. In
+ * typical operation, you would only see the last one when scanning. User had a combiner on the table, and they noticed that when using InMemoryMap with
+ * NativeMapWrapper, only the last KV pair was ever written. When InMemoryMap used DefaultMap, all KV pairs were added and the behavior worked as expected.
+ *
+ * This IT inserts a variety of Mutations with and without the same KV pairs and then inspects result of InMemoryMap mutate, looking for unique id stored with
+ * each key. This unique id, shown as mc= in the MemKey toString, was originally used for scan Isolation. Writing the same key multiple times in the same
+ * mutation is a secondary use case, discussed in https://issues.apache.org/jira/browse/ACCUMULO-227. In addition to NativeMapWrapper and DefaultMap,
+ * LocalityGroupMap was add in https://issues.apache.org/jira/browse/ACCUMULO-112.
+ *
+ * This test has to be an IT in accumulo-test, because libaccumulo is built in 'integration-test' phase of accumulo-native, which currently runs right before
+ * accumulo-test. The tests for DefaultMap could move to a unit test in tserver, but they are here for convenience of viewing both at the same time.
+ */
+public class InMemoryMapIT {
+
+  private static final Logger log = LoggerFactory.getLogger(InMemoryMapIT.class);
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  @BeforeClass
+  public static void ensureNativeLibrary() throws FileNotFoundException {
+    File nativeMapLocation = NativeMapIT.nativeMapLocation();
+    log.debug("Native map location " + nativeMapLocation);
+    NativeMap.loadNativeLib(Collections.singletonList(nativeMapLocation));
+    if (!NativeMap.isLoaded()) {
+      fail("Missing the native library from " + nativeMapLocation.getAbsolutePath() + "\nYou need to build the libaccumulo binary first. "
+          + "\nTry running 'mvn clean install -Dit.test=InMemoryMapIT -Dtest=foo -DfailIfNoTests=false -Dfindbugs.skip -Dcheckstyle.skip'");
+      // afterwards, you can run the following
+      // mvn clean verify -Dit.test=InMemoryMapIT -Dtest=foo -DfailIfNoTests=false -Dfindbugs.skip -Dcheckstyle.skip -pl :accumulo-test
+    }
+    log.debug("Native map loaded");
+
+  }
+
+  @Test
+  public void testOneMutationOneKey() {
+    Mutation m = new Mutation("a");
+    m.put(new Text("1cf"), new Text("1cq"), new Value("vala".getBytes()));
+
+    assertEquivalentMutate(m);
+  }
+
+  @Test
+  public void testOneMutationManyKeys() throws IOException {
+    Mutation m = new Mutation("a");
+    for (int i = 1; i < 6; i++) {
+      m.put(new Text("2cf" + i), new Text("2cq" + i), new Value(Integer.toString(i).getBytes()));
+    }
+
+    assertEquivalentMutate(m);
+  }
+
+  @Test
+  public void testOneMutationManySameKeys() {
+    Mutation m = new Mutation("a");
+    for (int i = 1; i <= 5; i++) {
+      // same keys
+      m.put(new Text("3cf"), new Text("3cq"), new Value(Integer.toString(i).getBytes()));
+    }
+
+    assertEquivalentMutate(m);
+  }
+
+  @Test
+  public void testMultipleMutationsOneKey() {
+    Mutation m1 = new Mutation("a");
+    m1.put(new Text("4cf"), new Text("4cq"), new Value("vala".getBytes()));
+    Mutation m2 = new Mutation("b");
+    m2.put(new Text("4cf"), new Text("4cq"), new Value("vala".getBytes()));
+
+    assertEquivalentMutate(Arrays.asList(m1, m2));
+  }
+
+  @Test
+  public void testMultipleMutationsSameOneKey() {
+    Mutation m1 = new Mutation("a");
+    m1.put(new Text("5cf"), new Text("5cq"), new Value("vala".getBytes()));
+    Mutation m2 = new Mutation("a");
+    m2.put(new Text("5cf"), new Text("5cq"), new Value("vala".getBytes()));
+
+    assertEquivalentMutate(Arrays.asList(m1, m2));
+  }
+
+  @Test
+  public void testMutlipleMutationsMultipleKeys() {
+    Mutation m1 = new Mutation("a");
+    for (int i = 1; i < 6; i++) {
+      m1.put(new Text("6cf" + i), new Text("6cq" + i), new Value(Integer.toString(i).getBytes()));
+    }
+    Mutation m2 = new Mutation("b");
+    for (int i = 1; i < 3; i++) {
+      m2.put(new Text("6cf" + i), new Text("6cq" + i), new Value(Integer.toString(i).getBytes()));
+    }
+
+    assertEquivalentMutate(Arrays.asList(m1, m2));
+  }
+
+  @Test
+  public void testMultipleMutationsMultipleSameKeys() {
+    Mutation m1 = new Mutation("a");
+    for (int i = 1; i < 3; i++) {
+      m1.put(new Text("7cf"), new Text("7cq"), new Value(Integer.toString(i).getBytes()));
+    }
+    Mutation m2 = new Mutation("a");
+    for (int i = 1; i < 4; i++) {
+      m2.put(new Text("7cf"), new Text("7cq"), new Value(Integer.toString(i).getBytes()));
+    }
+
+    assertEquivalentMutate(Arrays.asList(m1, m2));
+  }
+
+  @Test
+  public void testMultipleMutationsMultipleKeysSomeSame() {
+    Mutation m1 = new Mutation("a");
+    for (int i = 1; i < 2; i++) {
+      m1.put(new Text("8cf"), new Text("8cq"), new Value(Integer.toString(i).getBytes()));
+    }
+    for (int i = 1; i < 3; i++) {
+      m1.put(new Text("8cf" + i), new Text("8cq" + i), new Value(Integer.toString(i).getBytes()));
+    }
+    for (int i = 1; i < 2; i++) {
+      m1.put(new Text("8cf" + i), new Text("8cq" + i), new Value(Integer.toString(i).getBytes()));
+    }
+    Mutation m2 = new Mutation("a");
+    for (int i = 1; i < 3; i++) {
+      m2.put(new Text("8cf"), new Text("8cq"), new Value(Integer.toString(i).getBytes()));
+    }
+    for (int i = 1; i < 4; i++) {
+      m2.put(new Text("8cf" + i), new Text("8cq" + i), new Value(Integer.toString(i).getBytes()));
+    }
+    Mutation m3 = new Mutation("b");
+    for (int i = 1; i < 3; i++) {
+      m3.put(new Text("8cf" + i), new Text("8cq" + i), new Value(Integer.toString(i).getBytes()));
+    }
+
+    assertEquivalentMutate(Arrays.asList(m1, m2, m3));
+  }
+
+  private void assertEquivalentMutate(Mutation m) {
+    assertEquivalentMutate(Collections.singletonList(m));
+  }
+
+  private void assertEquivalentMutate(List<Mutation> mutations) {
+    InMemoryMap defaultMap = null;
+    InMemoryMap nativeMapWrapper = null;
+    InMemoryMap localityGroupMap = null;
+    InMemoryMap localityGroupMapWithNative = null;
+
+    try {
+      defaultMap = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+      nativeMapWrapper = new InMemoryMap(true, tempFolder.newFolder().getAbsolutePath());
+      localityGroupMap = new InMemoryMap(getLocalityGroups(), false, tempFolder.newFolder().getAbsolutePath());
+      localityGroupMapWithNative = new InMemoryMap(getLocalityGroups(), false, tempFolder.newFolder().getAbsolutePath());
+    } catch (IOException e) {
+      log.error("Error getting new InMemoryMap ", e);
+      fail(e.getMessage());
+    }
+
+    defaultMap.mutate(mutations);
+    nativeMapWrapper.mutate(mutations);
+    localityGroupMap.mutate(mutations);
+    localityGroupMapWithNative.mutate(mutations);
+
+    // let's use the transitive property to assert all four are equivalent
+    assertMutatesEquivalent(mutations, defaultMap, nativeMapWrapper);
+    assertMutatesEquivalent(mutations, defaultMap, localityGroupMap);
+    assertMutatesEquivalent(mutations, defaultMap, localityGroupMapWithNative);
+  }
+
+  /**
+   * Assert that a set of mutations mutate to equivalent map in both of the InMemoryMaps.
+   * <p>
+   * In this case, equivalent means 2 things.
+   * <ul>
+   * <li>The size of both maps generated is equal to the number of key value pairs in all mutations passed</li>
+   * <li>The size of the map generated from the first InMemoryMap equals the size of the map generated from the second</li>
+   * <li>Each key value pair in each mutated map has a unique id (kvCount)</li>
+   * </ul>
+   *
+   * @param mutations
+   *          List of mutations
+   * @param imm1
+   *          InMemoryMap to compare
+   * @param imm2
+   *          InMemoryMap to compare
+   */
+  private void assertMutatesEquivalent(List<Mutation> mutations, InMemoryMap imm1, InMemoryMap imm2) {
+    int mutationKVPairs = countKVPairs(mutations);
+
+    List<MemKey> memKeys1 = getArrayOfMemKeys(imm1);
+    List<MemKey> memKeys2 = getArrayOfMemKeys(imm2);
+
+    assertEquals("Not all key value pairs included: " + dumpInMemoryMap(imm1, memKeys1), mutationKVPairs, memKeys1.size());
+    assertEquals("InMemoryMaps differ in size: " + dumpInMemoryMap(imm1, memKeys1) + "\n" + dumpInMemoryMap(imm2, memKeys2), memKeys1.size(), memKeys2.size());
+    assertEquals("InMemoryMap did not have distinct kvCounts " + dumpInMemoryMap(imm1, memKeys1), mutationKVPairs, getUniqKVCount(memKeys1));
+    assertEquals("InMemoryMap did not have distinct kvCounts " + dumpInMemoryMap(imm2, memKeys2), mutationKVPairs, getUniqKVCount(memKeys2));
+
+  }
+
+  private int countKVPairs(List<Mutation> mutations) {
+    int count = 0;
+    for (Mutation m : mutations) {
+      count += m.size();
+    }
+    return count;
+  }
+
+  private List<MemKey> getArrayOfMemKeys(InMemoryMap imm) {
+    SortedKeyValueIterator<Key,Value> skvi = imm.compactionIterator();
+
+    List<MemKey> memKeys = new ArrayList<MemKey>();
+    try {
+      skvi.seek(new Range(), new ArrayList<ByteSequence>(), false); // everything
+      while (skvi.hasTop()) {
+        memKeys.add((MemKey) skvi.getTopKey());
+        skvi.next();
+      }
+    } catch (IOException ex) {
+      log.error("Error getting memkeys", ex);
+      throw new RuntimeException(ex);
+    }
+
+    return memKeys;
+  }
+
+  private String dumpInMemoryMap(InMemoryMap map, List<MemKey> memkeys) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("InMemoryMap type ");
+    sb.append(map.getMapType());
+    sb.append("\n");
+
+    for (MemKey mk : memkeys) {
+      sb.append("  ");
+      sb.append(mk.toString());
+      sb.append("\n");
+    }
+
+    return sb.toString();
+  }
+
+  private int getUniqKVCount(List<MemKey> memKeys) {
+    List<Integer> kvCounts = new ArrayList<Integer>();
+    for (MemKey m : memKeys) {
+      kvCounts.add(m.getKVCount());
+    }
+    return ImmutableSet.copyOf(kvCounts).size();
+  }
+
+  private Map<String,Set<ByteSequence>> getLocalityGroups() {
+    Map<String,Set<ByteSequence>> locgro = new HashMap<String,Set<ByteSequence>>();
+    locgro.put("a", newCFSet("cf", "cf2"));
+    locgro.put("a", newCFSet("cf3", "cf4"));
+    return locgro;
+  }
+
+  // from InMemoryMapTest
+  private Set<ByteSequence> newCFSet(String... cfs) {
+    HashSet<ByteSequence> cfSet = new HashSet<ByteSequence>();
+    for (String cf : cfs) {
+      cfSet.add(new ArrayByteSequence(cf));
+    }
+    return cfSet;
+  }
+
+}


[3/6] accumulo git commit: ACCUMULO-4148 Native map doesn't increment counter for every cell, potentially losing some updates

Posted by el...@apache.org.
ACCUMULO-4148 Native map doesn't increment counter for every cell, potentially losing some updates

The problem here is that InMemoryMap$DefaultMap increments the mutationCount or
kvCount for every key value pair in every Mutation that is passed in.  The
NativeMap, which is used by the InMemoryMap$NativeMapWrapper does not.  This
causes 2 different issues in the NativeMap.

1)  When a single Mutation has duplicate key value pairs, only the last is
recorded, because they all have the same mutationCount and the earlier ones are
hidden.
2 ) When multiple Mutations are passed in at the same time, the mutationCount
or kvCount starts over for each Mutation.  This can also lead to hiding of key
value pairs.

The tests added here expose both the issues as well as do some asserts on
simple Mutations.  A few tweaks were made to expose information to these tests.

1) Made MemKey public, made it's kvCount private and exposed that via a getter so
we can inspect directly instead of parsing the toString.  Required changing
some calls to kvCount to use the getter.

2) Added a final String to the InMemoryMap which is set during construction.  This
allows you to see what kind of SimpleMap was setup in the InMemoryMap.

Closes apache/accumulo#82

Signed-off-by: Josh Elser <el...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/41e002d7
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/41e002d7
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/41e002d7

Branch: refs/heads/master
Commit: 41e002d7a07415ccd2826b2db98783187b9422b3
Parents: 0206d78
Author: Michael Wall <mj...@gmail.com>
Authored: Thu Feb 25 21:20:15 2016 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Fri Apr 1 09:46:19 2016 -0400

----------------------------------------------------------------------
 .../apache/accumulo/tserver/InMemoryMap.java    |  32 +-
 .../org/apache/accumulo/tserver/MemKey.java     |  10 +-
 .../org/apache/accumulo/tserver/NativeMap.java  |  25 +-
 .../org/apache/accumulo/test/InMemoryMapIT.java | 319 +++++++++++++++++++
 4 files changed, 361 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/41e002d7/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
index 57c9440..614b34a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
@@ -82,7 +82,7 @@ class MemKeyComparator implements Comparator<Key>, Serializable {
     if (cmp == 0) {
       if (k1 instanceof MemKey)
         if (k2 instanceof MemKey)
-          cmp = ((MemKey) k2).kvCount - ((MemKey) k1).kvCount;
+          cmp = ((MemKey) k2).getKVCount() - ((MemKey) k1).getKVCount();
         else
           cmp = 1;
       else if (k2 instanceof MemKey)
@@ -104,7 +104,7 @@ class PartialMutationSkippingIterator extends SkippingIterator implements Interr
 
   @Override
   protected void consume() throws IOException {
-    while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).kvCount > kvCount)
+    while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).getKVCount() > kvCount)
       getSource().next();
   }
 
@@ -191,9 +191,15 @@ public class InMemoryMap {
 
   private volatile String memDumpFile = null;
   private final String memDumpDir;
+  private final String mapType;
 
   private Map<String,Set<ByteSequence>> lggroups;
 
+  public static final String TYPE_NATIVE_MAP_WRAPPER = "NativeMapWrapper";
+  public static final String TYPE_DEFAULT_MAP = "DefaultMap";
+  public static final String TYPE_LOCALITY_GROUP_MAP = "LocalityGroupMap";
+  public static final String TYPE_LOCALITY_GROUP_MAP_NATIVE = "LocalityGroupMap with native";
+
   public InMemoryMap(boolean useNativeMap, String memDumpDir) {
     this(new HashMap<String,Set<ByteSequence>>(), useNativeMap, memDumpDir);
   }
@@ -202,10 +208,26 @@ public class InMemoryMap {
     this.memDumpDir = memDumpDir;
     this.lggroups = lggroups;
 
-    if (lggroups.size() == 0)
+    if (lggroups.size() == 0) {
       map = newMap(useNativeMap);
-    else
+      mapType = useNativeMap ? TYPE_NATIVE_MAP_WRAPPER : TYPE_DEFAULT_MAP;
+    } else {
       map = new LocalityGroupMap(lggroups, useNativeMap);
+      mapType = useNativeMap ? TYPE_LOCALITY_GROUP_MAP : TYPE_LOCALITY_GROUP_MAP_NATIVE;
+    }
+  }
+
+  /**
+   * Description of the type of SimpleMap that is created.
+   * <p>
+   * If no locality groups are present, the SimpleMap is either TYPE_DEFAULT_MAP or TYPE_NATIVE_MAP_WRAPPER. If there is one more locality groups, then the
+   * InMemoryMap has an array for simple maps that either contain either TYPE_LOCALITY_GROUP_MAP which contains DefaultMaps or TYPE_LOCALITY_GROUP_MAP_NATIVE
+   * which contains NativeMapWrappers.
+   *
+   * @return String that describes the Map type
+   */
+  public String getMapType() {
+    return mapType;
   }
 
   public InMemoryMap(AccumuloConfiguration config) throws LocalityGroupConfigurationError {
@@ -772,7 +794,7 @@ public class InMemoryMap {
     while (iter.hasTop() && activeIters.size() > 0) {
       // RFile does not support MemKey, so we move the kv count into the value only for the RFile.
       // There is no need to change the MemKey to a normal key because the kvCount info gets lost when it is written
-      Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount);
+      Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).getKVCount());
       out.append(iter.getTopKey(), newValue);
       iter.next();
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/41e002d7/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java
index 443ffb2..f3cdb21 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java
@@ -16,15 +16,17 @@
  */
 package org.apache.accumulo.tserver;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.accumulo.core.data.Key;
 
-class MemKey extends Key {
+@VisibleForTesting
+public class MemKey extends Key {
 
-  int kvCount;
+  private int kvCount;
 
   public MemKey(byte[] row, byte[] cf, byte[] cq, byte[] cv, long ts, boolean del, boolean copy, int mc) {
     super(row, cf, cq, cv, ts, del, copy);
@@ -45,6 +47,10 @@ class MemKey extends Key {
     return super.toString() + " mc=" + kvCount;
   }
 
+  public int getKVCount() {
+    return this.kvCount;
+  }
+
   @Override
   public Object clone() throws CloneNotSupportedException {
     return super.clone();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/41e002d7/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
index f728a9b..7e1435e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -519,36 +520,25 @@ public class NativeMap implements Iterable<Map.Entry<Key,Value>> {
     }
   }
 
-  private void _mutate(Mutation mutation, int mutationCount) {
+  private int _mutate(Mutation mutation, int mutationCount) {
 
     List<ColumnUpdate> updates = mutation.getUpdates();
     if (updates.size() == 1) {
       ColumnUpdate update = updates.get(0);
       singleUpdate(nmPointer, mutation.getRow(), update.getColumnFamily(), update.getColumnQualifier(), update.getColumnVisibility(), update.getTimestamp(),
-          update.isDeleted(), update.getValue(), mutationCount);
+          update.isDeleted(), update.getValue(), mutationCount++);
     } else if (updates.size() > 1) {
       long uid = startUpdate(nmPointer, mutation.getRow());
       for (ColumnUpdate update : updates) {
         update(nmPointer, uid, update.getColumnFamily(), update.getColumnQualifier(), update.getColumnVisibility(), update.getTimestamp(), update.isDeleted(),
-            update.getValue(), mutationCount);
+            update.getValue(), mutationCount++);
       }
-
     }
+    return mutationCount;
   }
 
   public void mutate(Mutation mutation, int mutationCount) {
-    wlock.lock();
-    try {
-      if (nmPointer == 0) {
-        throw new IllegalStateException("Native Map Deleted");
-      }
-
-      modCount++;
-
-      _mutate(mutation, mutationCount);
-    } finally {
-      wlock.unlock();
-    }
+    mutate(Collections.singletonList(mutation), mutationCount);
   }
 
   public void mutate(List<Mutation> mutations, int mutationCount) {
@@ -567,8 +557,7 @@ public class NativeMap implements Iterable<Map.Entry<Key,Value>> {
         int count = 0;
         while (iter.hasNext() && count < 10) {
           Mutation mutation = iter.next();
-          _mutate(mutation, mutationCount);
-          mutationCount++;
+          mutationCount = _mutate(mutation, mutationCount);
           count += mutation.size();
         }
       } finally {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/41e002d7/test/src/test/java/org/apache/accumulo/test/InMemoryMapIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/InMemoryMapIT.java b/test/src/test/java/org/apache/accumulo/test/InMemoryMapIT.java
new file mode 100644
index 0000000..102762a
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/InMemoryMapIT.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import com.google.common.collect.ImmutableSet;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.test.functional.NativeMapIT;
+import org.apache.accumulo.tserver.InMemoryMap;
+import org.apache.accumulo.tserver.MemKey;
+import org.apache.accumulo.tserver.NativeMap;
+import org.apache.hadoop.io.Text;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration Test for https://issues.apache.org/jira/browse/ACCUMULO-4148
+ * <p>
+ * User had problem writing one Mutation with multiple KV pairs that had the same key. Doing so should write out all pairs in all mutations with a unique id. In
+ * typical operation, you would only see the last one when scanning. User had a combiner on the table, and they noticed that when using InMemoryMap with
+ * NativeMapWrapper, only the last KV pair was ever written. When InMemoryMap used DefaultMap, all KV pairs were added and the behavior worked as expected.
+ *
+ * This IT inserts a variety of Mutations with and without the same KV pairs and then inspects result of InMemoryMap mutate, looking for unique id stored with
+ * each key. This unique id, shown as mc= in the MemKey toString, was originally used for scan Isolation. Writing the same key multiple times in the same
+ * mutation is a secondary use case, discussed in https://issues.apache.org/jira/browse/ACCUMULO-227. In addition to NativeMapWrapper and DefaultMap,
+ * LocalityGroupMap was add in https://issues.apache.org/jira/browse/ACCUMULO-112.
+ *
+ * This test has to be an IT in accumulo-test, because libaccumulo is built in 'integration-test' phase of accumulo-native, which currently runs right before
+ * accumulo-test. The tests for DefaultMap could move to a unit test in tserver, but they are here for convenience of viewing both at the same time.
+ */
+public class InMemoryMapIT {
+
+  private static final Logger log = LoggerFactory.getLogger(InMemoryMapIT.class);
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  @BeforeClass
+  public static void ensureNativeLibrary() throws FileNotFoundException {
+    File nativeMapLocation = NativeMapIT.nativeMapLocation();
+    log.debug("Native map location " + nativeMapLocation);
+    NativeMap.loadNativeLib(Collections.singletonList(nativeMapLocation));
+    if (!NativeMap.isLoaded()) {
+      fail("Missing the native library from " + nativeMapLocation.getAbsolutePath() + "\nYou need to build the libaccumulo binary first. "
+          + "\nTry running 'mvn clean install -Dit.test=InMemoryMapIT -Dtest=foo -DfailIfNoTests=false -Dfindbugs.skip -Dcheckstyle.skip'");
+      // afterwards, you can run the following
+      // mvn clean verify -Dit.test=InMemoryMapIT -Dtest=foo -DfailIfNoTests=false -Dfindbugs.skip -Dcheckstyle.skip -pl :accumulo-test
+    }
+    log.debug("Native map loaded");
+
+  }
+
+  @Test
+  public void testOneMutationOneKey() {
+    Mutation m = new Mutation("a");
+    m.put(new Text("1cf"), new Text("1cq"), new Value("vala".getBytes()));
+
+    assertEquivalentMutate(m);
+  }
+
+  @Test
+  public void testOneMutationManyKeys() throws IOException {
+    Mutation m = new Mutation("a");
+    for (int i = 1; i < 6; i++) {
+      m.put(new Text("2cf" + i), new Text("2cq" + i), new Value(Integer.toString(i).getBytes()));
+    }
+
+    assertEquivalentMutate(m);
+  }
+
+  @Test
+  public void testOneMutationManySameKeys() {
+    Mutation m = new Mutation("a");
+    for (int i = 1; i <= 5; i++) {
+      // same keys
+      m.put(new Text("3cf"), new Text("3cq"), new Value(Integer.toString(i).getBytes()));
+    }
+
+    assertEquivalentMutate(m);
+  }
+
+  @Test
+  public void testMultipleMutationsOneKey() {
+    Mutation m1 = new Mutation("a");
+    m1.put(new Text("4cf"), new Text("4cq"), new Value("vala".getBytes()));
+    Mutation m2 = new Mutation("b");
+    m2.put(new Text("4cf"), new Text("4cq"), new Value("vala".getBytes()));
+
+    assertEquivalentMutate(Arrays.asList(m1, m2));
+  }
+
+  @Test
+  public void testMultipleMutationsSameOneKey() {
+    Mutation m1 = new Mutation("a");
+    m1.put(new Text("5cf"), new Text("5cq"), new Value("vala".getBytes()));
+    Mutation m2 = new Mutation("a");
+    m2.put(new Text("5cf"), new Text("5cq"), new Value("vala".getBytes()));
+
+    assertEquivalentMutate(Arrays.asList(m1, m2));
+  }
+
+  @Test
+  public void testMutlipleMutationsMultipleKeys() {
+    Mutation m1 = new Mutation("a");
+    for (int i = 1; i < 6; i++) {
+      m1.put(new Text("6cf" + i), new Text("6cq" + i), new Value(Integer.toString(i).getBytes()));
+    }
+    Mutation m2 = new Mutation("b");
+    for (int i = 1; i < 3; i++) {
+      m2.put(new Text("6cf" + i), new Text("6cq" + i), new Value(Integer.toString(i).getBytes()));
+    }
+
+    assertEquivalentMutate(Arrays.asList(m1, m2));
+  }
+
+  @Test
+  public void testMultipleMutationsMultipleSameKeys() {
+    Mutation m1 = new Mutation("a");
+    for (int i = 1; i < 3; i++) {
+      m1.put(new Text("7cf"), new Text("7cq"), new Value(Integer.toString(i).getBytes()));
+    }
+    Mutation m2 = new Mutation("a");
+    for (int i = 1; i < 4; i++) {
+      m2.put(new Text("7cf"), new Text("7cq"), new Value(Integer.toString(i).getBytes()));
+    }
+
+    assertEquivalentMutate(Arrays.asList(m1, m2));
+  }
+
+  @Test
+  public void testMultipleMutationsMultipleKeysSomeSame() {
+    Mutation m1 = new Mutation("a");
+    for (int i = 1; i < 2; i++) {
+      m1.put(new Text("8cf"), new Text("8cq"), new Value(Integer.toString(i).getBytes()));
+    }
+    for (int i = 1; i < 3; i++) {
+      m1.put(new Text("8cf" + i), new Text("8cq" + i), new Value(Integer.toString(i).getBytes()));
+    }
+    for (int i = 1; i < 2; i++) {
+      m1.put(new Text("8cf" + i), new Text("8cq" + i), new Value(Integer.toString(i).getBytes()));
+    }
+    Mutation m2 = new Mutation("a");
+    for (int i = 1; i < 3; i++) {
+      m2.put(new Text("8cf"), new Text("8cq"), new Value(Integer.toString(i).getBytes()));
+    }
+    for (int i = 1; i < 4; i++) {
+      m2.put(new Text("8cf" + i), new Text("8cq" + i), new Value(Integer.toString(i).getBytes()));
+    }
+    Mutation m3 = new Mutation("b");
+    for (int i = 1; i < 3; i++) {
+      m3.put(new Text("8cf" + i), new Text("8cq" + i), new Value(Integer.toString(i).getBytes()));
+    }
+
+    assertEquivalentMutate(Arrays.asList(m1, m2, m3));
+  }
+
+  private void assertEquivalentMutate(Mutation m) {
+    assertEquivalentMutate(Collections.singletonList(m));
+  }
+
+  private void assertEquivalentMutate(List<Mutation> mutations) {
+    InMemoryMap defaultMap = null;
+    InMemoryMap nativeMapWrapper = null;
+    InMemoryMap localityGroupMap = null;
+    InMemoryMap localityGroupMapWithNative = null;
+
+    try {
+      defaultMap = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+      nativeMapWrapper = new InMemoryMap(true, tempFolder.newFolder().getAbsolutePath());
+      localityGroupMap = new InMemoryMap(getLocalityGroups(), false, tempFolder.newFolder().getAbsolutePath());
+      localityGroupMapWithNative = new InMemoryMap(getLocalityGroups(), false, tempFolder.newFolder().getAbsolutePath());
+    } catch (IOException e) {
+      log.error("Error getting new InMemoryMap ", e);
+      fail(e.getMessage());
+    }
+
+    defaultMap.mutate(mutations);
+    nativeMapWrapper.mutate(mutations);
+    localityGroupMap.mutate(mutations);
+    localityGroupMapWithNative.mutate(mutations);
+
+    // let's use the transitive property to assert all four are equivalent
+    assertMutatesEquivalent(mutations, defaultMap, nativeMapWrapper);
+    assertMutatesEquivalent(mutations, defaultMap, localityGroupMap);
+    assertMutatesEquivalent(mutations, defaultMap, localityGroupMapWithNative);
+  }
+
+  /**
+   * Assert that a set of mutations mutate to equivalent map in both of the InMemoryMaps.
+   * <p>
+   * In this case, equivalent means 2 things.
+   * <ul>
+   * <li>The size of both maps generated is equal to the number of key value pairs in all mutations passed</li>
+   * <li>The size of the map generated from the first InMemoryMap equals the size of the map generated from the second</li>
+   * <li>Each key value pair in each mutated map has a unique id (kvCount)</li>
+   * </ul>
+   *
+   * @param mutations
+   *          List of mutations
+   * @param imm1
+   *          InMemoryMap to compare
+   * @param imm2
+   *          InMemoryMap to compare
+   */
+  private void assertMutatesEquivalent(List<Mutation> mutations, InMemoryMap imm1, InMemoryMap imm2) {
+    int mutationKVPairs = countKVPairs(mutations);
+
+    List<MemKey> memKeys1 = getArrayOfMemKeys(imm1);
+    List<MemKey> memKeys2 = getArrayOfMemKeys(imm2);
+
+    assertEquals("Not all key value pairs included: " + dumpInMemoryMap(imm1, memKeys1), mutationKVPairs, memKeys1.size());
+    assertEquals("InMemoryMaps differ in size: " + dumpInMemoryMap(imm1, memKeys1) + "\n" + dumpInMemoryMap(imm2, memKeys2), memKeys1.size(), memKeys2.size());
+    assertEquals("InMemoryMap did not have distinct kvCounts " + dumpInMemoryMap(imm1, memKeys1), mutationKVPairs, getUniqKVCount(memKeys1));
+    assertEquals("InMemoryMap did not have distinct kvCounts " + dumpInMemoryMap(imm2, memKeys2), mutationKVPairs, getUniqKVCount(memKeys2));
+
+  }
+
+  private int countKVPairs(List<Mutation> mutations) {
+    int count = 0;
+    for (Mutation m : mutations) {
+      count += m.size();
+    }
+    return count;
+  }
+
+  private List<MemKey> getArrayOfMemKeys(InMemoryMap imm) {
+    SortedKeyValueIterator<Key,Value> skvi = imm.compactionIterator();
+
+    List<MemKey> memKeys = new ArrayList<MemKey>();
+    try {
+      skvi.seek(new Range(), new ArrayList<ByteSequence>(), false); // everything
+      while (skvi.hasTop()) {
+        memKeys.add((MemKey) skvi.getTopKey());
+        skvi.next();
+      }
+    } catch (IOException ex) {
+      log.error("Error getting memkeys", ex);
+      throw new RuntimeException(ex);
+    }
+
+    return memKeys;
+  }
+
+  private String dumpInMemoryMap(InMemoryMap map, List<MemKey> memkeys) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("InMemoryMap type ");
+    sb.append(map.getMapType());
+    sb.append("\n");
+
+    for (MemKey mk : memkeys) {
+      sb.append("  ");
+      sb.append(mk.toString());
+      sb.append("\n");
+    }
+
+    return sb.toString();
+  }
+
+  private int getUniqKVCount(List<MemKey> memKeys) {
+    List<Integer> kvCounts = new ArrayList<Integer>();
+    for (MemKey m : memKeys) {
+      kvCounts.add(m.getKVCount());
+    }
+    return ImmutableSet.copyOf(kvCounts).size();
+  }
+
+  private Map<String,Set<ByteSequence>> getLocalityGroups() {
+    Map<String,Set<ByteSequence>> locgro = new HashMap<String,Set<ByteSequence>>();
+    locgro.put("a", newCFSet("cf", "cf2"));
+    locgro.put("a", newCFSet("cf3", "cf4"));
+    return locgro;
+  }
+
+  // from InMemoryMapTest
+  private Set<ByteSequence> newCFSet(String... cfs) {
+    HashSet<ByteSequence> cfSet = new HashSet<ByteSequence>();
+    for (String cf : cfs) {
+      cfSet.add(new ArrayByteSequence(cf));
+    }
+    return cfSet;
+  }
+
+}


[5/6] accumulo git commit: Merge branch '1.6' into 1.7

Posted by el...@apache.org.
Merge branch '1.6' into 1.7

Conflicts:
	server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f181cf6a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f181cf6a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f181cf6a

Branch: refs/heads/1.7
Commit: f181cf6a90a913e5453352a56d3ab470f15b068c
Parents: 2b286ba 41e002d
Author: Josh Elser <el...@apache.org>
Authored: Fri Apr 1 09:56:17 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Apr 1 09:56:17 2016 -0400

----------------------------------------------------------------------
 .../apache/accumulo/tserver/InMemoryMap.java    |  28 +-
 .../org/apache/accumulo/tserver/MemKey.java     |  10 +-
 .../accumulo/tserver/MemKeyComparator.java      |   2 +-
 .../org/apache/accumulo/tserver/NativeMap.java  |  27 +-
 .../PartialMutationSkippingIterator.java        |   2 +-
 .../org/apache/accumulo/test/InMemoryMapIT.java | 319 +++++++++++++++++++
 6 files changed, 362 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f181cf6a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f181cf6a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f181cf6a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyComparator.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyComparator.java
index 739b923,0000000..a623cac
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyComparator.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyComparator.java
@@@ -1,44 -1,0 +1,44 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver;
 +
 +import java.io.Serializable;
 +import java.util.Comparator;
 +
 +import org.apache.accumulo.core.data.Key;
 +
 +class MemKeyComparator implements Comparator<Key>, Serializable {
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  @Override
 +  public int compare(Key k1, Key k2) {
 +    int cmp = k1.compareTo(k2);
 +
 +    if (cmp == 0) {
 +      if (k1 instanceof MemKey)
 +        if (k2 instanceof MemKey)
-           cmp = ((MemKey) k2).kvCount - ((MemKey) k1).kvCount;
++          cmp = ((MemKey) k2).getKVCount() - ((MemKey) k1).getKVCount();
 +        else
 +          cmp = 1;
 +      else if (k2 instanceof MemKey)
 +        cmp = -1;
 +    }
 +
 +    return cmp;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f181cf6a/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
index 6eb8e4e,7e1435e..a6f7cf1
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
@@@ -502,29 -531,17 +503,18 @@@ public class NativeMap implements Itera
        long uid = startUpdate(nmPointer, mutation.getRow());
        for (ColumnUpdate update : updates) {
          update(nmPointer, uid, update.getColumnFamily(), update.getColumnQualifier(), update.getColumnVisibility(), update.getTimestamp(), update.isDeleted(),
-             update.getValue(), mutationCount);
+             update.getValue(), mutationCount++);
        }
- 
      }
+     return mutationCount;
    }
  
 +  @VisibleForTesting
    public void mutate(Mutation mutation, int mutationCount) {
-     wlock.lock();
-     try {
-       if (nmPointer == 0) {
-         throw new IllegalStateException("Native Map Deleted");
-       }
- 
-       modCount++;
- 
-       _mutate(mutation, mutationCount);
-     } finally {
-       wlock.unlock();
-     }
+     mutate(Collections.singletonList(mutation), mutationCount);
    }
  
 -  public void mutate(List<Mutation> mutations, int mutationCount) {
 +  void mutate(List<Mutation> mutations, int mutationCount) {
      Iterator<Mutation> iter = mutations.iterator();
  
      while (iter.hasNext()) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f181cf6a/server/tserver/src/main/java/org/apache/accumulo/tserver/PartialMutationSkippingIterator.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/PartialMutationSkippingIterator.java
index 5d0733b,0000000..3373c88
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/PartialMutationSkippingIterator.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/PartialMutationSkippingIterator.java
@@@ -1,54 -1,0 +1,54 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver;
 +
 +import java.io.IOException;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorEnvironment;
 +import org.apache.accumulo.core.iterators.SkippingIterator;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
 +
 +class PartialMutationSkippingIterator extends SkippingIterator implements InterruptibleIterator {
 +
 +  private int kvCount;
 +
 +  public PartialMutationSkippingIterator(SortedKeyValueIterator<Key,Value> source, int maxKVCount) {
 +    setSource(source);
 +    this.kvCount = maxKVCount;
 +  }
 +
 +  @Override
 +  protected void consume() throws IOException {
-     while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).kvCount > kvCount)
++    while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).getKVCount() > kvCount)
 +      getSource().next();
 +  }
 +
 +  @Override
 +  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
 +    return new PartialMutationSkippingIterator(getSource().deepCopy(env), kvCount);
 +  }
 +
 +  @Override
 +  public void setInterruptFlag(AtomicBoolean flag) {
 +    ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
 +  }
 +
 +}


[2/6] accumulo git commit: ACCUMULO-4148 Native map doesn't increment counter for every cell, potentially losing some updates

Posted by el...@apache.org.
ACCUMULO-4148 Native map doesn't increment counter for every cell, potentially losing some updates

The problem here is that InMemoryMap$DefaultMap increments the mutationCount or
kvCount for every key value pair in every Mutation that is passed in.  The
NativeMap, which is used by the InMemoryMap$NativeMapWrapper does not.  This
causes 2 different issues in the NativeMap.

1)  When a single Mutation has duplicate key value pairs, only the last is
recorded, because they all have the same mutationCount and the earlier ones are
hidden.
2 ) When multiple Mutations are passed in at the same time, the mutationCount
or kvCount starts over for each Mutation.  This can also lead to hiding of key
value pairs.

The tests added here expose both the issues as well as do some asserts on
simple Mutations.  A few tweaks were made to expose information to these tests.

1) Made MemKey public, made it's kvCount private and exposed that via a getter so
we can inspect directly instead of parsing the toString.  Required changing
some calls to kvCount to use the getter.

2) Added a final String to the InMemoryMap which is set during construction.  This
allows you to see what kind of SimpleMap was setup in the InMemoryMap.

Closes apache/accumulo#82

Signed-off-by: Josh Elser <el...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/41e002d7
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/41e002d7
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/41e002d7

Branch: refs/heads/1.7
Commit: 41e002d7a07415ccd2826b2db98783187b9422b3
Parents: 0206d78
Author: Michael Wall <mj...@gmail.com>
Authored: Thu Feb 25 21:20:15 2016 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Fri Apr 1 09:46:19 2016 -0400

----------------------------------------------------------------------
 .../apache/accumulo/tserver/InMemoryMap.java    |  32 +-
 .../org/apache/accumulo/tserver/MemKey.java     |  10 +-
 .../org/apache/accumulo/tserver/NativeMap.java  |  25 +-
 .../org/apache/accumulo/test/InMemoryMapIT.java | 319 +++++++++++++++++++
 4 files changed, 361 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/41e002d7/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
index 57c9440..614b34a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
@@ -82,7 +82,7 @@ class MemKeyComparator implements Comparator<Key>, Serializable {
     if (cmp == 0) {
       if (k1 instanceof MemKey)
         if (k2 instanceof MemKey)
-          cmp = ((MemKey) k2).kvCount - ((MemKey) k1).kvCount;
+          cmp = ((MemKey) k2).getKVCount() - ((MemKey) k1).getKVCount();
         else
           cmp = 1;
       else if (k2 instanceof MemKey)
@@ -104,7 +104,7 @@ class PartialMutationSkippingIterator extends SkippingIterator implements Interr
 
   @Override
   protected void consume() throws IOException {
-    while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).kvCount > kvCount)
+    while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).getKVCount() > kvCount)
       getSource().next();
   }
 
@@ -191,9 +191,15 @@ public class InMemoryMap {
 
   private volatile String memDumpFile = null;
   private final String memDumpDir;
+  private final String mapType;
 
   private Map<String,Set<ByteSequence>> lggroups;
 
+  public static final String TYPE_NATIVE_MAP_WRAPPER = "NativeMapWrapper";
+  public static final String TYPE_DEFAULT_MAP = "DefaultMap";
+  public static final String TYPE_LOCALITY_GROUP_MAP = "LocalityGroupMap";
+  public static final String TYPE_LOCALITY_GROUP_MAP_NATIVE = "LocalityGroupMap with native";
+
   public InMemoryMap(boolean useNativeMap, String memDumpDir) {
     this(new HashMap<String,Set<ByteSequence>>(), useNativeMap, memDumpDir);
   }
@@ -202,10 +208,26 @@ public class InMemoryMap {
     this.memDumpDir = memDumpDir;
     this.lggroups = lggroups;
 
-    if (lggroups.size() == 0)
+    if (lggroups.size() == 0) {
       map = newMap(useNativeMap);
-    else
+      mapType = useNativeMap ? TYPE_NATIVE_MAP_WRAPPER : TYPE_DEFAULT_MAP;
+    } else {
       map = new LocalityGroupMap(lggroups, useNativeMap);
+      mapType = useNativeMap ? TYPE_LOCALITY_GROUP_MAP : TYPE_LOCALITY_GROUP_MAP_NATIVE;
+    }
+  }
+
+  /**
+   * Description of the type of SimpleMap that is created.
+   * <p>
+   * If no locality groups are present, the SimpleMap is either TYPE_DEFAULT_MAP or TYPE_NATIVE_MAP_WRAPPER. If there is one more locality groups, then the
+   * InMemoryMap has an array for simple maps that either contain either TYPE_LOCALITY_GROUP_MAP which contains DefaultMaps or TYPE_LOCALITY_GROUP_MAP_NATIVE
+   * which contains NativeMapWrappers.
+   *
+   * @return String that describes the Map type
+   */
+  public String getMapType() {
+    return mapType;
   }
 
   public InMemoryMap(AccumuloConfiguration config) throws LocalityGroupConfigurationError {
@@ -772,7 +794,7 @@ public class InMemoryMap {
     while (iter.hasTop() && activeIters.size() > 0) {
       // RFile does not support MemKey, so we move the kv count into the value only for the RFile.
       // There is no need to change the MemKey to a normal key because the kvCount info gets lost when it is written
-      Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount);
+      Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).getKVCount());
       out.append(iter.getTopKey(), newValue);
       iter.next();
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/41e002d7/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java
index 443ffb2..f3cdb21 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java
@@ -16,15 +16,17 @@
  */
 package org.apache.accumulo.tserver;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.accumulo.core.data.Key;
 
-class MemKey extends Key {
+@VisibleForTesting
+public class MemKey extends Key {
 
-  int kvCount;
+  private int kvCount;
 
   public MemKey(byte[] row, byte[] cf, byte[] cq, byte[] cv, long ts, boolean del, boolean copy, int mc) {
     super(row, cf, cq, cv, ts, del, copy);
@@ -45,6 +47,10 @@ class MemKey extends Key {
     return super.toString() + " mc=" + kvCount;
   }
 
+  public int getKVCount() {
+    return this.kvCount;
+  }
+
   @Override
   public Object clone() throws CloneNotSupportedException {
     return super.clone();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/41e002d7/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
index f728a9b..7e1435e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -519,36 +520,25 @@ public class NativeMap implements Iterable<Map.Entry<Key,Value>> {
     }
   }
 
-  private void _mutate(Mutation mutation, int mutationCount) {
+  private int _mutate(Mutation mutation, int mutationCount) {
 
     List<ColumnUpdate> updates = mutation.getUpdates();
     if (updates.size() == 1) {
       ColumnUpdate update = updates.get(0);
       singleUpdate(nmPointer, mutation.getRow(), update.getColumnFamily(), update.getColumnQualifier(), update.getColumnVisibility(), update.getTimestamp(),
-          update.isDeleted(), update.getValue(), mutationCount);
+          update.isDeleted(), update.getValue(), mutationCount++);
     } else if (updates.size() > 1) {
       long uid = startUpdate(nmPointer, mutation.getRow());
       for (ColumnUpdate update : updates) {
         update(nmPointer, uid, update.getColumnFamily(), update.getColumnQualifier(), update.getColumnVisibility(), update.getTimestamp(), update.isDeleted(),
-            update.getValue(), mutationCount);
+            update.getValue(), mutationCount++);
       }
-
     }
+    return mutationCount;
   }
 
   public void mutate(Mutation mutation, int mutationCount) {
-    wlock.lock();
-    try {
-      if (nmPointer == 0) {
-        throw new IllegalStateException("Native Map Deleted");
-      }
-
-      modCount++;
-
-      _mutate(mutation, mutationCount);
-    } finally {
-      wlock.unlock();
-    }
+    mutate(Collections.singletonList(mutation), mutationCount);
   }
 
   public void mutate(List<Mutation> mutations, int mutationCount) {
@@ -567,8 +557,7 @@ public class NativeMap implements Iterable<Map.Entry<Key,Value>> {
         int count = 0;
         while (iter.hasNext() && count < 10) {
           Mutation mutation = iter.next();
-          _mutate(mutation, mutationCount);
-          mutationCount++;
+          mutationCount = _mutate(mutation, mutationCount);
           count += mutation.size();
         }
       } finally {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/41e002d7/test/src/test/java/org/apache/accumulo/test/InMemoryMapIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/InMemoryMapIT.java b/test/src/test/java/org/apache/accumulo/test/InMemoryMapIT.java
new file mode 100644
index 0000000..102762a
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/InMemoryMapIT.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import com.google.common.collect.ImmutableSet;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.test.functional.NativeMapIT;
+import org.apache.accumulo.tserver.InMemoryMap;
+import org.apache.accumulo.tserver.MemKey;
+import org.apache.accumulo.tserver.NativeMap;
+import org.apache.hadoop.io.Text;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration Test for https://issues.apache.org/jira/browse/ACCUMULO-4148
+ * <p>
+ * User had problem writing one Mutation with multiple KV pairs that had the same key. Doing so should write out all pairs in all mutations with a unique id. In
+ * typical operation, you would only see the last one when scanning. User had a combiner on the table, and they noticed that when using InMemoryMap with
+ * NativeMapWrapper, only the last KV pair was ever written. When InMemoryMap used DefaultMap, all KV pairs were added and the behavior worked as expected.
+ *
+ * This IT inserts a variety of Mutations with and without the same KV pairs and then inspects result of InMemoryMap mutate, looking for unique id stored with
+ * each key. This unique id, shown as mc= in the MemKey toString, was originally used for scan Isolation. Writing the same key multiple times in the same
+ * mutation is a secondary use case, discussed in https://issues.apache.org/jira/browse/ACCUMULO-227. In addition to NativeMapWrapper and DefaultMap,
+ * LocalityGroupMap was add in https://issues.apache.org/jira/browse/ACCUMULO-112.
+ *
+ * This test has to be an IT in accumulo-test, because libaccumulo is built in 'integration-test' phase of accumulo-native, which currently runs right before
+ * accumulo-test. The tests for DefaultMap could move to a unit test in tserver, but they are here for convenience of viewing both at the same time.
+ */
+public class InMemoryMapIT {
+
+  private static final Logger log = LoggerFactory.getLogger(InMemoryMapIT.class);
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  @BeforeClass
+  public static void ensureNativeLibrary() throws FileNotFoundException {
+    File nativeMapLocation = NativeMapIT.nativeMapLocation();
+    log.debug("Native map location " + nativeMapLocation);
+    NativeMap.loadNativeLib(Collections.singletonList(nativeMapLocation));
+    if (!NativeMap.isLoaded()) {
+      fail("Missing the native library from " + nativeMapLocation.getAbsolutePath() + "\nYou need to build the libaccumulo binary first. "
+          + "\nTry running 'mvn clean install -Dit.test=InMemoryMapIT -Dtest=foo -DfailIfNoTests=false -Dfindbugs.skip -Dcheckstyle.skip'");
+      // afterwards, you can run the following
+      // mvn clean verify -Dit.test=InMemoryMapIT -Dtest=foo -DfailIfNoTests=false -Dfindbugs.skip -Dcheckstyle.skip -pl :accumulo-test
+    }
+    log.debug("Native map loaded");
+
+  }
+
+  @Test
+  public void testOneMutationOneKey() {
+    Mutation m = new Mutation("a");
+    m.put(new Text("1cf"), new Text("1cq"), new Value("vala".getBytes()));
+
+    assertEquivalentMutate(m);
+  }
+
+  @Test
+  public void testOneMutationManyKeys() throws IOException {
+    Mutation m = new Mutation("a");
+    for (int i = 1; i < 6; i++) {
+      m.put(new Text("2cf" + i), new Text("2cq" + i), new Value(Integer.toString(i).getBytes()));
+    }
+
+    assertEquivalentMutate(m);
+  }
+
+  @Test
+  public void testOneMutationManySameKeys() {
+    Mutation m = new Mutation("a");
+    for (int i = 1; i <= 5; i++) {
+      // same keys
+      m.put(new Text("3cf"), new Text("3cq"), new Value(Integer.toString(i).getBytes()));
+    }
+
+    assertEquivalentMutate(m);
+  }
+
+  @Test
+  public void testMultipleMutationsOneKey() {
+    Mutation m1 = new Mutation("a");
+    m1.put(new Text("4cf"), new Text("4cq"), new Value("vala".getBytes()));
+    Mutation m2 = new Mutation("b");
+    m2.put(new Text("4cf"), new Text("4cq"), new Value("vala".getBytes()));
+
+    assertEquivalentMutate(Arrays.asList(m1, m2));
+  }
+
+  @Test
+  public void testMultipleMutationsSameOneKey() {
+    Mutation m1 = new Mutation("a");
+    m1.put(new Text("5cf"), new Text("5cq"), new Value("vala".getBytes()));
+    Mutation m2 = new Mutation("a");
+    m2.put(new Text("5cf"), new Text("5cq"), new Value("vala".getBytes()));
+
+    assertEquivalentMutate(Arrays.asList(m1, m2));
+  }
+
+  @Test
+  public void testMutlipleMutationsMultipleKeys() {
+    Mutation m1 = new Mutation("a");
+    for (int i = 1; i < 6; i++) {
+      m1.put(new Text("6cf" + i), new Text("6cq" + i), new Value(Integer.toString(i).getBytes()));
+    }
+    Mutation m2 = new Mutation("b");
+    for (int i = 1; i < 3; i++) {
+      m2.put(new Text("6cf" + i), new Text("6cq" + i), new Value(Integer.toString(i).getBytes()));
+    }
+
+    assertEquivalentMutate(Arrays.asList(m1, m2));
+  }
+
+  @Test
+  public void testMultipleMutationsMultipleSameKeys() {
+    Mutation m1 = new Mutation("a");
+    for (int i = 1; i < 3; i++) {
+      m1.put(new Text("7cf"), new Text("7cq"), new Value(Integer.toString(i).getBytes()));
+    }
+    Mutation m2 = new Mutation("a");
+    for (int i = 1; i < 4; i++) {
+      m2.put(new Text("7cf"), new Text("7cq"), new Value(Integer.toString(i).getBytes()));
+    }
+
+    assertEquivalentMutate(Arrays.asList(m1, m2));
+  }
+
+  @Test
+  public void testMultipleMutationsMultipleKeysSomeSame() {
+    Mutation m1 = new Mutation("a");
+    for (int i = 1; i < 2; i++) {
+      m1.put(new Text("8cf"), new Text("8cq"), new Value(Integer.toString(i).getBytes()));
+    }
+    for (int i = 1; i < 3; i++) {
+      m1.put(new Text("8cf" + i), new Text("8cq" + i), new Value(Integer.toString(i).getBytes()));
+    }
+    for (int i = 1; i < 2; i++) {
+      m1.put(new Text("8cf" + i), new Text("8cq" + i), new Value(Integer.toString(i).getBytes()));
+    }
+    Mutation m2 = new Mutation("a");
+    for (int i = 1; i < 3; i++) {
+      m2.put(new Text("8cf"), new Text("8cq"), new Value(Integer.toString(i).getBytes()));
+    }
+    for (int i = 1; i < 4; i++) {
+      m2.put(new Text("8cf" + i), new Text("8cq" + i), new Value(Integer.toString(i).getBytes()));
+    }
+    Mutation m3 = new Mutation("b");
+    for (int i = 1; i < 3; i++) {
+      m3.put(new Text("8cf" + i), new Text("8cq" + i), new Value(Integer.toString(i).getBytes()));
+    }
+
+    assertEquivalentMutate(Arrays.asList(m1, m2, m3));
+  }
+
+  private void assertEquivalentMutate(Mutation m) {
+    assertEquivalentMutate(Collections.singletonList(m));
+  }
+
+  private void assertEquivalentMutate(List<Mutation> mutations) {
+    InMemoryMap defaultMap = null;
+    InMemoryMap nativeMapWrapper = null;
+    InMemoryMap localityGroupMap = null;
+    InMemoryMap localityGroupMapWithNative = null;
+
+    try {
+      defaultMap = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath());
+      nativeMapWrapper = new InMemoryMap(true, tempFolder.newFolder().getAbsolutePath());
+      localityGroupMap = new InMemoryMap(getLocalityGroups(), false, tempFolder.newFolder().getAbsolutePath());
+      localityGroupMapWithNative = new InMemoryMap(getLocalityGroups(), false, tempFolder.newFolder().getAbsolutePath());
+    } catch (IOException e) {
+      log.error("Error getting new InMemoryMap ", e);
+      fail(e.getMessage());
+    }
+
+    defaultMap.mutate(mutations);
+    nativeMapWrapper.mutate(mutations);
+    localityGroupMap.mutate(mutations);
+    localityGroupMapWithNative.mutate(mutations);
+
+    // let's use the transitive property to assert all four are equivalent
+    assertMutatesEquivalent(mutations, defaultMap, nativeMapWrapper);
+    assertMutatesEquivalent(mutations, defaultMap, localityGroupMap);
+    assertMutatesEquivalent(mutations, defaultMap, localityGroupMapWithNative);
+  }
+
+  /**
+   * Assert that a set of mutations mutate to equivalent map in both of the InMemoryMaps.
+   * <p>
+   * In this case, equivalent means 2 things.
+   * <ul>
+   * <li>The size of both maps generated is equal to the number of key value pairs in all mutations passed</li>
+   * <li>The size of the map generated from the first InMemoryMap equals the size of the map generated from the second</li>
+   * <li>Each key value pair in each mutated map has a unique id (kvCount)</li>
+   * </ul>
+   *
+   * @param mutations
+   *          List of mutations
+   * @param imm1
+   *          InMemoryMap to compare
+   * @param imm2
+   *          InMemoryMap to compare
+   */
+  private void assertMutatesEquivalent(List<Mutation> mutations, InMemoryMap imm1, InMemoryMap imm2) {
+    int mutationKVPairs = countKVPairs(mutations);
+
+    List<MemKey> memKeys1 = getArrayOfMemKeys(imm1);
+    List<MemKey> memKeys2 = getArrayOfMemKeys(imm2);
+
+    assertEquals("Not all key value pairs included: " + dumpInMemoryMap(imm1, memKeys1), mutationKVPairs, memKeys1.size());
+    assertEquals("InMemoryMaps differ in size: " + dumpInMemoryMap(imm1, memKeys1) + "\n" + dumpInMemoryMap(imm2, memKeys2), memKeys1.size(), memKeys2.size());
+    assertEquals("InMemoryMap did not have distinct kvCounts " + dumpInMemoryMap(imm1, memKeys1), mutationKVPairs, getUniqKVCount(memKeys1));
+    assertEquals("InMemoryMap did not have distinct kvCounts " + dumpInMemoryMap(imm2, memKeys2), mutationKVPairs, getUniqKVCount(memKeys2));
+
+  }
+
+  private int countKVPairs(List<Mutation> mutations) {
+    int count = 0;
+    for (Mutation m : mutations) {
+      count += m.size();
+    }
+    return count;
+  }
+
+  private List<MemKey> getArrayOfMemKeys(InMemoryMap imm) {
+    SortedKeyValueIterator<Key,Value> skvi = imm.compactionIterator();
+
+    List<MemKey> memKeys = new ArrayList<MemKey>();
+    try {
+      skvi.seek(new Range(), new ArrayList<ByteSequence>(), false); // everything
+      while (skvi.hasTop()) {
+        memKeys.add((MemKey) skvi.getTopKey());
+        skvi.next();
+      }
+    } catch (IOException ex) {
+      log.error("Error getting memkeys", ex);
+      throw new RuntimeException(ex);
+    }
+
+    return memKeys;
+  }
+
+  private String dumpInMemoryMap(InMemoryMap map, List<MemKey> memkeys) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("InMemoryMap type ");
+    sb.append(map.getMapType());
+    sb.append("\n");
+
+    for (MemKey mk : memkeys) {
+      sb.append("  ");
+      sb.append(mk.toString());
+      sb.append("\n");
+    }
+
+    return sb.toString();
+  }
+
+  private int getUniqKVCount(List<MemKey> memKeys) {
+    List<Integer> kvCounts = new ArrayList<Integer>();
+    for (MemKey m : memKeys) {
+      kvCounts.add(m.getKVCount());
+    }
+    return ImmutableSet.copyOf(kvCounts).size();
+  }
+
+  private Map<String,Set<ByteSequence>> getLocalityGroups() {
+    Map<String,Set<ByteSequence>> locgro = new HashMap<String,Set<ByteSequence>>();
+    locgro.put("a", newCFSet("cf", "cf2"));
+    locgro.put("a", newCFSet("cf3", "cf4"));
+    return locgro;
+  }
+
+  // from InMemoryMapTest
+  private Set<ByteSequence> newCFSet(String... cfs) {
+    HashSet<ByteSequence> cfSet = new HashSet<ByteSequence>();
+    for (String cf : cfs) {
+      cfSet.add(new ArrayByteSequence(cf));
+    }
+    return cfSet;
+  }
+
+}


[6/6] accumulo git commit: Merge branch '1.7'

Posted by el...@apache.org.
Merge branch '1.7'

Conflicts:
	server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0dd1d6a5
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0dd1d6a5
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0dd1d6a5

Branch: refs/heads/master
Commit: 0dd1d6a511114f8ff25188e695c1fca8f1139559
Parents: 02450e4 f181cf6
Author: Josh Elser <el...@apache.org>
Authored: Fri Apr 1 12:52:15 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Apr 1 12:52:15 2016 -0400

----------------------------------------------------------------------
 .../apache/accumulo/tserver/InMemoryMap.java    |  23 +-
 .../org/apache/accumulo/tserver/MemKey.java     |  10 +-
 .../accumulo/tserver/MemKeyComparator.java      |   2 +-
 .../org/apache/accumulo/tserver/NativeMap.java  |  27 +-
 .../PartialMutationSkippingIterator.java        |   2 +-
 .../org/apache/accumulo/test/InMemoryMapIT.java | 361 +++++++++++++++++++
 6 files changed, 401 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0dd1d6a5/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
index f5141ff,72f84b5..1b02f14
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
@@@ -91,58 -79,43 +92,65 @@@ public class InMemoryMap 
  
    private Map<String,Set<ByteSequence>> lggroups;
  
 +  private static Pair<SamplerConfigurationImpl,Sampler> getSampler(AccumuloConfiguration config) {
 +    try {
 +      SamplerConfigurationImpl sampleConfig = SamplerConfigurationImpl.newSamplerConfig(config);
 +      if (sampleConfig == null) {
 +        return new Pair<>(null, null);
 +      }
 +
 +      return new Pair<>(sampleConfig, SamplerFactory.newSampler(sampleConfig, config));
 +    } catch (IOException e) {
 +      throw new RuntimeException(e);
 +    }
 +  }
 +
+   public static final String TYPE_NATIVE_MAP_WRAPPER = "NativeMapWrapper";
+   public static final String TYPE_DEFAULT_MAP = "DefaultMap";
+   public static final String TYPE_LOCALITY_GROUP_MAP = "LocalityGroupMap";
+   public static final String TYPE_LOCALITY_GROUP_MAP_NATIVE = "LocalityGroupMap with native";
+ 
 -  public InMemoryMap(boolean useNativeMap, String memDumpDir) {
 -    this(new HashMap<String,Set<ByteSequence>>(), useNativeMap, memDumpDir);
 +  private AtomicReference<Pair<SamplerConfigurationImpl,Sampler>> samplerRef = new AtomicReference<>(null);
 +
 +  private AccumuloConfiguration config;
 +
 +  // defer creating sampler until first write. This was done because an empty sample map configured with no sampler will not flush after a user changes sample
 +  // config.
 +  private Sampler getOrCreateSampler() {
 +    Pair<SamplerConfigurationImpl,Sampler> pair = samplerRef.get();
 +    if (pair == null) {
 +      pair = getSampler(config);
 +      if (!samplerRef.compareAndSet(null, pair)) {
 +        pair = samplerRef.get();
 +      }
 +    }
 +
 +    return pair.getSecond();
    }
  
 -  public InMemoryMap(Map<String,Set<ByteSequence>> lggroups, boolean useNativeMap, String memDumpDir) {
 -    this.memDumpDir = memDumpDir;
 -    this.lggroups = lggroups;
 +  public InMemoryMap(AccumuloConfiguration config) throws LocalityGroupConfigurationError {
 +
 +    boolean useNativeMap = config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED);
 +
 +    this.memDumpDir = config.get(Property.TSERV_MEMDUMP_DIR);
 +    this.lggroups = LocalityGroupUtil.getLocalityGroups(config);
 +
 +    this.config = config;
 +
 +    SimpleMap allMap;
 +    SimpleMap sampleMap;
  
      if (lggroups.size() == 0) {
 -      map = newMap(useNativeMap);
 +      allMap = newMap(useNativeMap);
 +      sampleMap = newMap(useNativeMap);
+       mapType = useNativeMap ? TYPE_NATIVE_MAP_WRAPPER : TYPE_DEFAULT_MAP;
      } else {
 -      map = new LocalityGroupMap(lggroups, useNativeMap);
 +      allMap = new LocalityGroupMap(lggroups, useNativeMap);
 +      sampleMap = new LocalityGroupMap(lggroups, useNativeMap);
+       mapType = useNativeMap ? TYPE_LOCALITY_GROUP_MAP : TYPE_LOCALITY_GROUP_MAP_NATIVE;
      }
 -  }
  
 -  /**
 -   * Description of the type of SimpleMap that is created.
 -   * <p>
 -   * If no locality groups are present, the SimpleMap is either TYPE_DEFAULT_MAP or TYPE_NATIVE_MAP_WRAPPER. If there is one more locality groups, then the
 -   * InMemoryMap has an array for simple maps that either contain either TYPE_LOCALITY_GROUP_MAP which contains DefaultMaps or TYPE_LOCALITY_GROUP_MAP_NATIVE
 -   * which contains NativeMapWrappers.
 -   *
 -   * @return String that describes the Map type
 -   */
 -  public String getMapType() {
 -    return mapType;
 -  }
 -
 -  public InMemoryMap(AccumuloConfiguration config) throws LocalityGroupConfigurationError {
 -    this(LocalityGroupUtil.getLocalityGroups(config), config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED), config.get(Property.TSERV_MEMDUMP_DIR));
 +    map = new SampleMap(allMap, sampleMap);
    }
  
    private static SimpleMap newMap(boolean useNativeMap) {
@@@ -157,6 -130,6 +165,19 @@@
      return new DefaultMap();
    }
  
++  /**
++   * Description of the type of SimpleMap that is created.
++   * <p>
++   * If no locality groups are present, the SimpleMap is either TYPE_DEFAULT_MAP or TYPE_NATIVE_MAP_WRAPPER. If there is one more locality groups, then the
++   * InMemoryMap has an array for simple maps that either contain either TYPE_LOCALITY_GROUP_MAP which contains DefaultMaps or TYPE_LOCALITY_GROUP_MAP_NATIVE
++   * which contains NativeMapWrappers.
++   *
++   * @return String that describes the Map type
++   */
++  public String getMapType() {
++    return mapType;
++  }
++
    private interface SimpleMap {
      Value get(Key key);
  
@@@ -856,8 -692,10 +877,8 @@@
      while (iter.hasTop() && activeIters.size() > 0) {
        // RFile does not support MemKey, so we move the kv count into the value only for the RFile.
        // There is no need to change the MemKey to a normal key because the kvCount info gets lost when it is written
-       out.append(iter.getTopKey(), MemValue.encode(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount));
 -      Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).getKVCount());
 -      out.append(iter.getTopKey(), newValue);
++      out.append(iter.getTopKey(), MemValue.encode(iter.getTopValue(), ((MemKey) iter.getTopKey()).getKVCount()));
        iter.next();
 -
      }
    }
  }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0dd1d6a5/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0dd1d6a5/test/src/main/java/org/apache/accumulo/test/InMemoryMapIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/InMemoryMapIT.java
index 0000000,0000000..cdb09d9
new file mode 100644
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/InMemoryMapIT.java
@@@ -1,0 -1,0 +1,361 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.test;
++
++import com.google.common.collect.ImmutableSet;
++import java.io.File;
++import java.io.FileNotFoundException;
++import java.io.IOException;
++import java.util.ArrayList;
++import java.util.Arrays;
++import java.util.Collections;
++import java.util.HashMap;
++import java.util.HashSet;
++import java.util.List;
++import java.util.Map;
++import java.util.Map.Entry;
++import java.util.Set;
++
++import org.apache.accumulo.core.conf.ConfigurationCopy;
++import org.apache.accumulo.core.conf.Property;
++import org.apache.accumulo.core.data.ArrayByteSequence;
++import org.apache.accumulo.core.data.ByteSequence;
++import org.apache.accumulo.core.data.Key;
++import org.apache.accumulo.core.data.Mutation;
++import org.apache.accumulo.core.data.Range;
++import org.apache.accumulo.core.data.Value;
++import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
++import org.apache.accumulo.test.functional.NativeMapIT;
++import org.apache.accumulo.tserver.InMemoryMap;
++import org.apache.accumulo.tserver.MemKey;
++import org.apache.accumulo.tserver.NativeMap;
++import org.apache.hadoop.io.Text;
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.fail;
++import org.junit.BeforeClass;
++import org.junit.Rule;
++import org.junit.Test;
++import org.junit.rules.TemporaryFolder;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++/**
++ * Integration Test for https://issues.apache.org/jira/browse/ACCUMULO-4148
++ * <p>
++ * User had problem writing one Mutation with multiple KV pairs that had the same key. Doing so should write out all pairs in all mutations with a unique id. In
++ * typical operation, you would only see the last one when scanning. User had a combiner on the table, and they noticed that when using InMemoryMap with
++ * NativeMapWrapper, only the last KV pair was ever written. When InMemoryMap used DefaultMap, all KV pairs were added and the behavior worked as expected.
++ *
++ * This IT inserts a variety of Mutations with and without the same KV pairs and then inspects result of InMemoryMap mutate, looking for unique id stored with
++ * each key. This unique id, shown as mc= in the MemKey toString, was originally used for scan Isolation. Writing the same key multiple times in the same
++ * mutation is a secondary use case, discussed in https://issues.apache.org/jira/browse/ACCUMULO-227. In addition to NativeMapWrapper and DefaultMap,
++ * LocalityGroupMap was add in https://issues.apache.org/jira/browse/ACCUMULO-112.
++ *
++ * This test has to be an IT in accumulo-test, because libaccumulo is built in 'integration-test' phase of accumulo-native, which currently runs right before
++ * accumulo-test. The tests for DefaultMap could move to a unit test in tserver, but they are here for convenience of viewing both at the same time.
++ */
++public class InMemoryMapIT {
++
++  private static final Logger log = LoggerFactory.getLogger(InMemoryMapIT.class);
++
++  @Rule
++  public TemporaryFolder tempFolder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
++
++  @BeforeClass
++  public static void ensureNativeLibrary() throws FileNotFoundException {
++    File nativeMapLocation = NativeMapIT.nativeMapLocation();
++    log.debug("Native map location " + nativeMapLocation);
++    NativeMap.loadNativeLib(Collections.singletonList(nativeMapLocation));
++    if (!NativeMap.isLoaded()) {
++      fail("Missing the native library from " + nativeMapLocation.getAbsolutePath() + "\nYou need to build the libaccumulo binary first. "
++          + "\nTry running 'mvn clean install -Dit.test=InMemoryMapIT -Dtest=foo -DfailIfNoTests=false -Dfindbugs.skip -Dcheckstyle.skip'");
++      // afterwards, you can run the following
++      // mvn clean verify -Dit.test=InMemoryMapIT -Dtest=foo -DfailIfNoTests=false -Dfindbugs.skip -Dcheckstyle.skip -pl :accumulo-test
++    }
++    log.debug("Native map loaded");
++
++  }
++
++  @Test
++  public void testOneMutationOneKey() {
++    Mutation m = new Mutation("a");
++    m.put(new Text("1cf"), new Text("1cq"), new Value("vala".getBytes()));
++
++    assertEquivalentMutate(m);
++  }
++
++  @Test
++  public void testOneMutationManyKeys() throws IOException {
++    Mutation m = new Mutation("a");
++    for (int i = 1; i < 6; i++) {
++      m.put(new Text("2cf" + i), new Text("2cq" + i), new Value(Integer.toString(i).getBytes()));
++    }
++
++    assertEquivalentMutate(m);
++  }
++
++  @Test
++  public void testOneMutationManySameKeys() {
++    Mutation m = new Mutation("a");
++    for (int i = 1; i <= 5; i++) {
++      // same keys
++      m.put(new Text("3cf"), new Text("3cq"), new Value(Integer.toString(i).getBytes()));
++    }
++
++    assertEquivalentMutate(m);
++  }
++
++  @Test
++  public void testMultipleMutationsOneKey() {
++    Mutation m1 = new Mutation("a");
++    m1.put(new Text("4cf"), new Text("4cq"), new Value("vala".getBytes()));
++    Mutation m2 = new Mutation("b");
++    m2.put(new Text("4cf"), new Text("4cq"), new Value("vala".getBytes()));
++
++    assertEquivalentMutate(Arrays.asList(m1, m2));
++  }
++
++  @Test
++  public void testMultipleMutationsSameOneKey() {
++    Mutation m1 = new Mutation("a");
++    m1.put(new Text("5cf"), new Text("5cq"), new Value("vala".getBytes()));
++    Mutation m2 = new Mutation("a");
++    m2.put(new Text("5cf"), new Text("5cq"), new Value("vala".getBytes()));
++
++    assertEquivalentMutate(Arrays.asList(m1, m2));
++  }
++
++  @Test
++  public void testMutlipleMutationsMultipleKeys() {
++    Mutation m1 = new Mutation("a");
++    for (int i = 1; i < 6; i++) {
++      m1.put(new Text("6cf" + i), new Text("6cq" + i), new Value(Integer.toString(i).getBytes()));
++    }
++    Mutation m2 = new Mutation("b");
++    for (int i = 1; i < 3; i++) {
++      m2.put(new Text("6cf" + i), new Text("6cq" + i), new Value(Integer.toString(i).getBytes()));
++    }
++
++    assertEquivalentMutate(Arrays.asList(m1, m2));
++  }
++
++  @Test
++  public void testMultipleMutationsMultipleSameKeys() {
++    Mutation m1 = new Mutation("a");
++    for (int i = 1; i < 3; i++) {
++      m1.put(new Text("7cf"), new Text("7cq"), new Value(Integer.toString(i).getBytes()));
++    }
++    Mutation m2 = new Mutation("a");
++    for (int i = 1; i < 4; i++) {
++      m2.put(new Text("7cf"), new Text("7cq"), new Value(Integer.toString(i).getBytes()));
++    }
++
++    assertEquivalentMutate(Arrays.asList(m1, m2));
++  }
++
++  @Test
++  public void testMultipleMutationsMultipleKeysSomeSame() {
++    Mutation m1 = new Mutation("a");
++    for (int i = 1; i < 2; i++) {
++      m1.put(new Text("8cf"), new Text("8cq"), new Value(Integer.toString(i).getBytes()));
++    }
++    for (int i = 1; i < 3; i++) {
++      m1.put(new Text("8cf" + i), new Text("8cq" + i), new Value(Integer.toString(i).getBytes()));
++    }
++    for (int i = 1; i < 2; i++) {
++      m1.put(new Text("8cf" + i), new Text("8cq" + i), new Value(Integer.toString(i).getBytes()));
++    }
++    Mutation m2 = new Mutation("a");
++    for (int i = 1; i < 3; i++) {
++      m2.put(new Text("8cf"), new Text("8cq"), new Value(Integer.toString(i).getBytes()));
++    }
++    for (int i = 1; i < 4; i++) {
++      m2.put(new Text("8cf" + i), new Text("8cq" + i), new Value(Integer.toString(i).getBytes()));
++    }
++    Mutation m3 = new Mutation("b");
++    for (int i = 1; i < 3; i++) {
++      m3.put(new Text("8cf" + i), new Text("8cq" + i), new Value(Integer.toString(i).getBytes()));
++    }
++
++    assertEquivalentMutate(Arrays.asList(m1, m2, m3));
++  }
++
++  private void assertEquivalentMutate(Mutation m) {
++    assertEquivalentMutate(Collections.singletonList(m));
++  }
++
++  private void assertEquivalentMutate(List<Mutation> mutations) {
++    InMemoryMap defaultMap = null;
++    InMemoryMap nativeMapWrapper = null;
++    InMemoryMap localityGroupMap = null;
++    InMemoryMap localityGroupMapWithNative = null;
++
++    try {
++      Map<String,String> defaultMapConfig = new HashMap<>();
++      defaultMapConfig.put(Property.TSERV_NATIVEMAP_ENABLED.getKey(), "false");
++      defaultMapConfig.put(Property.TSERV_MEMDUMP_DIR.getKey(), tempFolder.newFolder().getAbsolutePath());
++      defaultMapConfig.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "");
++      Map<String,String> nativeMapConfig = new HashMap<>();
++      nativeMapConfig.put(Property.TSERV_NATIVEMAP_ENABLED.getKey(), "true");
++      nativeMapConfig.put(Property.TSERV_MEMDUMP_DIR.getKey(), tempFolder.newFolder().getAbsolutePath());
++      nativeMapConfig.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "");
++      Map<String,String> localityGroupConfig = new HashMap<>();
++      localityGroupConfig.put(Property.TSERV_NATIVEMAP_ENABLED.getKey(), "false");
++      localityGroupConfig.put(Property.TSERV_MEMDUMP_DIR.getKey(), tempFolder.newFolder().getAbsolutePath());
++      Map<String,String> localityGroupNativeConfig = new HashMap<>();
++      localityGroupNativeConfig.put(Property.TSERV_NATIVEMAP_ENABLED.getKey(), "false");
++      localityGroupNativeConfig.put(Property.TSERV_MEMDUMP_DIR.getKey(), tempFolder.newFolder().getAbsolutePath());
++
++      defaultMap = new InMemoryMap(new ConfigurationCopy(defaultMapConfig));
++      nativeMapWrapper = new InMemoryMap(new ConfigurationCopy(nativeMapConfig));
++      localityGroupMap = new InMemoryMap(updateConfigurationForLocalityGroups(new ConfigurationCopy(localityGroupConfig)));
++      localityGroupMapWithNative = new InMemoryMap(updateConfigurationForLocalityGroups(new ConfigurationCopy(localityGroupNativeConfig)));
++    } catch (Exception e) {
++      log.error("Error getting new InMemoryMap ", e);
++      fail(e.getMessage());
++    }
++
++    defaultMap.mutate(mutations);
++    nativeMapWrapper.mutate(mutations);
++    localityGroupMap.mutate(mutations);
++    localityGroupMapWithNative.mutate(mutations);
++
++    // let's use the transitive property to assert all four are equivalent
++    assertMutatesEquivalent(mutations, defaultMap, nativeMapWrapper);
++    assertMutatesEquivalent(mutations, defaultMap, localityGroupMap);
++    assertMutatesEquivalent(mutations, defaultMap, localityGroupMapWithNative);
++  }
++
++  /**
++   * Assert that a set of mutations mutate to equivalent map in both of the InMemoryMaps.
++   * <p>
++   * In this case, equivalent means 2 things.
++   * <ul>
++   * <li>The size of both maps generated is equal to the number of key value pairs in all mutations passed</li>
++   * <li>The size of the map generated from the first InMemoryMap equals the size of the map generated from the second</li>
++   * <li>Each key value pair in each mutated map has a unique id (kvCount)</li>
++   * </ul>
++   *
++   * @param mutations
++   *          List of mutations
++   * @param imm1
++   *          InMemoryMap to compare
++   * @param imm2
++   *          InMemoryMap to compare
++   */
++  private void assertMutatesEquivalent(List<Mutation> mutations, InMemoryMap imm1, InMemoryMap imm2) {
++    int mutationKVPairs = countKVPairs(mutations);
++
++    List<MemKey> memKeys1 = getArrayOfMemKeys(imm1);
++    List<MemKey> memKeys2 = getArrayOfMemKeys(imm2);
++
++    assertEquals("Not all key value pairs included: " + dumpInMemoryMap(imm1, memKeys1), mutationKVPairs, memKeys1.size());
++    assertEquals("InMemoryMaps differ in size: " + dumpInMemoryMap(imm1, memKeys1) + "\n" + dumpInMemoryMap(imm2, memKeys2), memKeys1.size(), memKeys2.size());
++    assertEquals("InMemoryMap did not have distinct kvCounts " + dumpInMemoryMap(imm1, memKeys1), mutationKVPairs, getUniqKVCount(memKeys1));
++    assertEquals("InMemoryMap did not have distinct kvCounts " + dumpInMemoryMap(imm2, memKeys2), mutationKVPairs, getUniqKVCount(memKeys2));
++
++  }
++
++  private int countKVPairs(List<Mutation> mutations) {
++    int count = 0;
++    for (Mutation m : mutations) {
++      count += m.size();
++    }
++    return count;
++  }
++
++  private List<MemKey> getArrayOfMemKeys(InMemoryMap imm) {
++    SortedKeyValueIterator<Key,Value> skvi = imm.compactionIterator();
++
++    List<MemKey> memKeys = new ArrayList<MemKey>();
++    try {
++      skvi.seek(new Range(), new ArrayList<ByteSequence>(), false); // everything
++      while (skvi.hasTop()) {
++        memKeys.add((MemKey) skvi.getTopKey());
++        skvi.next();
++      }
++    } catch (IOException ex) {
++      log.error("Error getting memkeys", ex);
++      throw new RuntimeException(ex);
++    }
++
++    return memKeys;
++  }
++
++  private String dumpInMemoryMap(InMemoryMap map, List<MemKey> memkeys) {
++    StringBuilder sb = new StringBuilder();
++    sb.append("InMemoryMap type ");
++    sb.append(map.getMapType());
++    sb.append("\n");
++
++    for (MemKey mk : memkeys) {
++      sb.append("  ");
++      sb.append(mk.toString());
++      sb.append("\n");
++    }
++
++    return sb.toString();
++  }
++
++  private int getUniqKVCount(List<MemKey> memKeys) {
++    List<Integer> kvCounts = new ArrayList<Integer>();
++    for (MemKey m : memKeys) {
++      kvCounts.add(m.getKVCount());
++    }
++    return ImmutableSet.copyOf(kvCounts).size();
++  }
++
++  private ConfigurationCopy updateConfigurationForLocalityGroups(ConfigurationCopy configuration) {
++    Map<String,Set<ByteSequence>> locGroups = getLocalityGroups();
++    StringBuilder enabledLGs = new StringBuilder();
++
++    for (Entry<String,Set<ByteSequence>> entry : locGroups.entrySet()) {
++      if (enabledLGs.length() > 0) {
++        enabledLGs.append(",");
++      }
++
++      StringBuilder value = new StringBuilder();
++      for (ByteSequence bytes : entry.getValue()) {
++        if (value.length() > 0) {
++          value.append(",");
++        }
++        value.append(new String(bytes.toArray()));
++      }
++      configuration.set("table.group." + entry.getKey(), value.toString());
++      enabledLGs.append(entry.getKey());
++    }
++    configuration.set(Property.TABLE_LOCALITY_GROUPS, enabledLGs.toString());
++    return configuration;
++  }
++
++  private Map<String,Set<ByteSequence>> getLocalityGroups() {
++    Map<String,Set<ByteSequence>> locgro = new HashMap<String,Set<ByteSequence>>();
++    locgro.put("a", newCFSet("cf", "cf2"));
++    locgro.put("a", newCFSet("cf3", "cf4"));
++    return locgro;
++  }
++
++  // from InMemoryMapTest
++  private Set<ByteSequence> newCFSet(String... cfs) {
++    HashSet<ByteSequence> cfSet = new HashSet<ByteSequence>();
++    for (String cf : cfs) {
++      cfSet.add(new ArrayByteSequence(cf));
++    }
++    return cfSet;
++  }
++
++}


[4/6] accumulo git commit: Merge branch '1.6' into 1.7

Posted by el...@apache.org.
Merge branch '1.6' into 1.7

Conflicts:
	server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f181cf6a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f181cf6a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f181cf6a

Branch: refs/heads/master
Commit: f181cf6a90a913e5453352a56d3ab470f15b068c
Parents: 2b286ba 41e002d
Author: Josh Elser <el...@apache.org>
Authored: Fri Apr 1 09:56:17 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Apr 1 09:56:17 2016 -0400

----------------------------------------------------------------------
 .../apache/accumulo/tserver/InMemoryMap.java    |  28 +-
 .../org/apache/accumulo/tserver/MemKey.java     |  10 +-
 .../accumulo/tserver/MemKeyComparator.java      |   2 +-
 .../org/apache/accumulo/tserver/NativeMap.java  |  27 +-
 .../PartialMutationSkippingIterator.java        |   2 +-
 .../org/apache/accumulo/test/InMemoryMapIT.java | 319 +++++++++++++++++++
 6 files changed, 362 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f181cf6a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f181cf6a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f181cf6a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyComparator.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyComparator.java
index 739b923,0000000..a623cac
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyComparator.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyComparator.java
@@@ -1,44 -1,0 +1,44 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver;
 +
 +import java.io.Serializable;
 +import java.util.Comparator;
 +
 +import org.apache.accumulo.core.data.Key;
 +
 +class MemKeyComparator implements Comparator<Key>, Serializable {
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  @Override
 +  public int compare(Key k1, Key k2) {
 +    int cmp = k1.compareTo(k2);
 +
 +    if (cmp == 0) {
 +      if (k1 instanceof MemKey)
 +        if (k2 instanceof MemKey)
-           cmp = ((MemKey) k2).kvCount - ((MemKey) k1).kvCount;
++          cmp = ((MemKey) k2).getKVCount() - ((MemKey) k1).getKVCount();
 +        else
 +          cmp = 1;
 +      else if (k2 instanceof MemKey)
 +        cmp = -1;
 +    }
 +
 +    return cmp;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f181cf6a/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
index 6eb8e4e,7e1435e..a6f7cf1
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
@@@ -502,29 -531,17 +503,18 @@@ public class NativeMap implements Itera
        long uid = startUpdate(nmPointer, mutation.getRow());
        for (ColumnUpdate update : updates) {
          update(nmPointer, uid, update.getColumnFamily(), update.getColumnQualifier(), update.getColumnVisibility(), update.getTimestamp(), update.isDeleted(),
-             update.getValue(), mutationCount);
+             update.getValue(), mutationCount++);
        }
- 
      }
+     return mutationCount;
    }
  
 +  @VisibleForTesting
    public void mutate(Mutation mutation, int mutationCount) {
-     wlock.lock();
-     try {
-       if (nmPointer == 0) {
-         throw new IllegalStateException("Native Map Deleted");
-       }
- 
-       modCount++;
- 
-       _mutate(mutation, mutationCount);
-     } finally {
-       wlock.unlock();
-     }
+     mutate(Collections.singletonList(mutation), mutationCount);
    }
  
 -  public void mutate(List<Mutation> mutations, int mutationCount) {
 +  void mutate(List<Mutation> mutations, int mutationCount) {
      Iterator<Mutation> iter = mutations.iterator();
  
      while (iter.hasNext()) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f181cf6a/server/tserver/src/main/java/org/apache/accumulo/tserver/PartialMutationSkippingIterator.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/PartialMutationSkippingIterator.java
index 5d0733b,0000000..3373c88
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/PartialMutationSkippingIterator.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/PartialMutationSkippingIterator.java
@@@ -1,54 -1,0 +1,54 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver;
 +
 +import java.io.IOException;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorEnvironment;
 +import org.apache.accumulo.core.iterators.SkippingIterator;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
 +
 +class PartialMutationSkippingIterator extends SkippingIterator implements InterruptibleIterator {
 +
 +  private int kvCount;
 +
 +  public PartialMutationSkippingIterator(SortedKeyValueIterator<Key,Value> source, int maxKVCount) {
 +    setSource(source);
 +    this.kvCount = maxKVCount;
 +  }
 +
 +  @Override
 +  protected void consume() throws IOException {
-     while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).kvCount > kvCount)
++    while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).getKVCount() > kvCount)
 +      getSource().next();
 +  }
 +
 +  @Override
 +  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
 +    return new PartialMutationSkippingIterator(getSource().deepCopy(env), kvCount);
 +  }
 +
 +  @Override
 +  public void setInterruptFlag(AtomicBoolean flag) {
 +    ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
 +  }
 +
 +}