You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/08/22 21:18:53 UTC

git commit: ACCUMULO-3077 Set StatusCombiner on accumulo.metadata during initialize.

Repository: accumulo
Updated Branches:
  refs/heads/master 5ef4da5c6 -> 176541991


ACCUMULO-3077 Set StatusCombiner on accumulo.metadata during initialize.

Before mutations are written to the metadata table to
track replication, the configuration for the metadata
table is checked to ensure that the Combiner is set
(to make sure that the state is appropriately managed).

Because we cache ZK data and update it via Watchers, it's
possible that a tserver compacts some data after it was written
before seeing that the Combiner was set, effectively destroying
the older versions (VersioningIterator).

Setting the Combiner at init for the metadata table
does a better job at eliminiating this possibility.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/17654199
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/17654199
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/17654199

Branch: refs/heads/master
Commit: 176541991c430239e09e06e199d01feb671e39d7
Parents: 5ef4da5
Author: Josh Elser <el...@apache.org>
Authored: Fri Aug 22 13:58:52 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Aug 22 15:18:42 2014 -0400

----------------------------------------------------------------------
 .../apache/accumulo/server/init/Initialize.java | 30 ++++++++++++++--
 .../server/util/ReplicationTableUtil.java       |  4 +--
 .../server/replication/StatusCombinerTest.java  | 25 +++++++++++++-
 .../tserver/log/TabletServerLogger.java         |  4 +--
 .../tserver/tablet/DatafileManager.java         | 25 ++++++++------
 .../test/replication/StatusCombinerMacTest.java | 36 +++++++++++++++++++-
 6 files changed, 104 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/17654199/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 0a681c4..9b952ba 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -20,6 +20,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Locale;
@@ -31,6 +32,8 @@ import jline.console.ConsoleReader;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.cli.Help;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.IteratorSetting.Column;
 import org.apache.accumulo.core.client.impl.Namespaces;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -41,6 +44,8 @@ import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.iterators.Combiner;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.user.VersioningIterator;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.master.thrift.MasterGoalState;
@@ -65,10 +70,12 @@ import org.apache.accumulo.server.constraints.MetadataConstraints;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
+import org.apache.accumulo.server.replication.StatusCombiner;
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
 import org.apache.accumulo.server.security.SystemCredentials;
 import org.apache.accumulo.server.tables.TableManager;
 import org.apache.accumulo.server.tablets.TabletTime;
+import org.apache.accumulo.server.util.ReplicationTableUtil;
 import org.apache.accumulo.server.util.TablePropUtil;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.conf.Configuration;
@@ -84,7 +91,7 @@ import com.beust.jcommander.Parameter;
 
 /**
  * This class is used to setup the directory structure and the root tablet to get an instance started
- * 
+ *
  */
 public class Initialize {
   private static final Logger log = Logger.getLogger(Initialize.class);
@@ -102,7 +109,7 @@ public class Initialize {
 
   /**
    * Sets this class's ZooKeeper reader/writer.
-   * 
+   *
    * @param izoo
    *          reader/writer
    */
@@ -112,7 +119,7 @@ public class Initialize {
 
   /**
    * Gets this class's ZooKeeper reader/writer.
-   * 
+   *
    * @return reader/writer
    */
   static IZooReaderWriter getZooReaderWriter() {
@@ -566,6 +573,23 @@ public class Initialize {
   protected static void initMetadataConfig() throws IOException {
     initMetadataConfig(RootTable.ID);
     initMetadataConfig(MetadataTable.ID);
+
+    // ACCUMULO-3077 Set the combiner on accumulo.metadata during init to reduce the likelihood of a race
+    // condition where a tserver compacts away Status updates because it didn't see the Combiner configured
+    IteratorSetting setting = new IteratorSetting(9, ReplicationTableUtil.COMBINER_NAME, StatusCombiner.class);
+    Combiner.setColumns(setting, Collections.singletonList(new Column(MetadataSchema.ReplicationSection.COLF)));
+    try {
+      for (IteratorScope scope : IteratorScope.values()) {
+        String root = String.format("%s%s.%s", Property.TABLE_ITERATOR_PREFIX, scope.name().toLowerCase(), setting.getName());
+        for (Entry<String,String> prop : setting.getOptions().entrySet()) {
+          TablePropUtil.setTableProperty(MetadataTable.ID, root + ".opt." + prop.getKey(), prop.getValue());
+        }
+        TablePropUtil.setTableProperty(MetadataTable.ID, root, setting.getPriority() + "," + setting.getIteratorClass());
+      }
+    } catch (Exception e) {
+      log.fatal("Error talking to ZooKeeper", e);
+      throw new IOException(e);
+    }
   }
 
   private static void setMetadataReplication(int replication, String reason) throws IOException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/17654199/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
index 2a9774d..ab5ee86 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
@@ -72,7 +72,7 @@ public class ReplicationTableUtil {
    * For testing purposes only -- should not be called by server code
    * <p>
    * Allows mocking of a Writer for testing
-   * 
+   *
    * @param creds
    *          Credentials
    * @param writer
@@ -187,7 +187,7 @@ public class ReplicationTableUtil {
    */
   public static void updateFiles(Credentials creds, KeyExtent extent, Collection<String> files, Status stat) {
     if (log.isDebugEnabled()) {
-      log.debug("Updating replication for " + extent + " with " + files + " using " + ProtobufUtil.toString(stat));
+      log.debug("Updating replication status for " + extent + " with " + files + " using " + ProtobufUtil.toString(stat));
     }
     // TODO could use batch writer, would need to handle failure and retry like update does - ACCUMULO-1294
     if (files.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/17654199/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java b/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
index 5f7e457..a9801b0 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
@@ -17,8 +17,10 @@
 package org.apache.accumulo.server.replication;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.IteratorSetting.Column;
@@ -38,7 +40,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 /**
- * 
+ *
  */
 public class StatusCombinerTest {
 
@@ -55,22 +57,27 @@ public class StatusCombinerTest {
     Combiner.setColumns(cfg, Collections.singletonList(new Column(StatusSection.NAME)));
     combiner.init(new DevNull(), cfg.getOptions(), new IteratorEnvironment() {
 
+      @Override
       public AccumuloConfiguration getConfig() {
         return null;
       }
 
+      @Override
       public IteratorScope getIteratorScope() {
         return null;
       }
 
+      @Override
       public boolean isFullMajorCompaction() {
         return false;
       }
 
+      @Override
       public void registerSideChannel(SortedKeyValueIterator<Key,Value> arg0) {
 
       }
 
+      @Override
       public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String arg0) throws IOException {
         return null;
       }
@@ -252,4 +259,20 @@ public class StatusCombinerTest {
 
     Assert.assertEquals(combined, combined2);
   }
+
+  @Test
+  public void testCombination() {
+    List<Status> status = new ArrayList<>();
+    long time = System.currentTimeMillis();
+
+    status.add(StatusUtil.fileCreated(time));
+    status.add(StatusUtil.openWithUnknownLength());
+    status.add(StatusUtil.fileClosed());
+
+    Status combined = combiner.typedReduce(new Key("row"), status.iterator());
+
+    Assert.assertEquals(time, combined.getCreatedTime());
+    Assert.assertTrue(combined.getInfiniteEnd());
+    Assert.assertTrue(combined.getClosed());
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/17654199/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index b4f14ec..26e6891 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -276,8 +276,8 @@ public class TabletServerLogger {
                   logs.add(logger.getFileName());
                 }
                 Status status = StatusUtil.fileCreated(System.currentTimeMillis());
-                log.debug("Writing " + ProtobufUtil.toString(status) + " to replication table for " + logs);
-                // Got some new WALs, note this in the replication table
+                log.debug("Writing " + ProtobufUtil.toString(status) + " to metadata table for " + logs);
+                // Got some new WALs, note this in the metadata table
                 ReplicationTableUtil.updateFiles(SystemCredentials.get(), commitSession.getExtent(), logs, status);
               }
             }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/17654199/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
index 5b46b7b..78a2ed6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
@@ -61,7 +61,7 @@ class DatafileManager {
   // access to datafilesizes needs to be synchronized: see CompactionRunner#getNumFiles
   private final Map<FileRef,DataFileValue> datafileSizes = Collections.synchronizedMap(new TreeMap<FileRef,DataFileValue>());
   private final Tablet tablet;
-  
+
   // ensure we only have one reader/writer of our bulk file notes at at time
   private final Object bulkFileImportLock = new Object();
 
@@ -80,7 +80,7 @@ class DatafileManager {
   private boolean reservationsBlocked = false;
 
   private final Set<FileRef> majorCompactingFiles = new HashSet<FileRef>();
-  
+
   static void rename(VolumeManager fs, Path src, Path dst) throws IOException {
     if (!fs.rename(src, dst)) {
       throw new IOException("Rename " + src + " to " + dst + " returned false ");
@@ -268,7 +268,7 @@ class DatafileManager {
             dfv.setTime(bulkTime);
           }
         }
-        
+
         tablet.updatePersistedTime(bulkTime, paths, tid);
       }
     }
@@ -424,6 +424,9 @@ class DatafileManager {
       // This WAL could still be in use by other Tablets *from the same table*, so we can only mark that there is data to replicate,
       // but it is *not* closed
       if (replicate) {
+        if (log.isDebugEnabled()) {
+          log.debug("Recording that data has been ingested into " + tablet.getExtent() + " using " + logFileOnly);
+        }
         ReplicationTableUtil.updateFiles(SystemCredentials.get(), tablet.getExtent(), logFileOnly, StatusUtil.openWithUnknownLength());
       }
     } finally {
@@ -434,7 +437,7 @@ class DatafileManager {
       try {
         // the purpose of making this update use the new commit session, instead of the old one passed in,
         // is because the new one will reference the logs used by current memory...
-        
+
         tablet.getTabletServer().minorCompactionFinished(tablet.getTabletMemory().getCommitSession(), newDatafile.toString(), commitSession.getWALogSeq() + 2);
         break;
       } catch (IOException e) {
@@ -449,19 +452,19 @@ class DatafileManager {
       if (datafileSizes.containsKey(newDatafile)) {
         log.error("Adding file that is already in set " + newDatafile);
       }
-      
+
       if (dfv.getNumEntries() > 0) {
         datafileSizes.put(newDatafile, dfv);
       }
-      
+
       if (absMergeFile != null) {
         datafileSizes.remove(absMergeFile);
       }
-      
+
       unreserveMergingMinorCompactionFile(absMergeFile);
-      
+
       tablet.flushComplete(flushId);
-      
+
       t2 = System.currentTimeMillis();
     }
 
@@ -597,9 +600,9 @@ class DatafileManager {
       return Collections.unmodifiableSet(files);
     }
   }
-  
+
   public int getNumFiles() {
     return datafileSizes.size();
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/17654199/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacTest.java b/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacTest.java
index e264488..396df20 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacTest.java
@@ -16,21 +16,30 @@
  */
 package org.apache.accumulo.test.replication;
 
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.server.util.ReplicationTableUtil;
 import org.apache.accumulo.test.functional.SimpleMacIT;
 import org.apache.hadoop.io.Text;
 import org.junit.Assert;
@@ -39,11 +48,36 @@ import org.junit.Test;
 import com.google.common.collect.Iterables;
 
 /**
- * 
+ *
  */
 public class StatusCombinerMacTest extends SimpleMacIT {
 
   @Test
+  public void testCombinerSetOnMetadata() throws Exception {
+    TableOperations tops = getConnector().tableOperations();
+    Map<String,EnumSet<IteratorScope>> iterators = tops.listIterators(MetadataTable.NAME);
+
+    Assert.assertTrue(iterators.containsKey(ReplicationTableUtil.COMBINER_NAME));
+    EnumSet<IteratorScope> scopes = iterators.get(ReplicationTableUtil.COMBINER_NAME);
+    Assert.assertEquals(3, scopes.size());
+    Assert.assertTrue(scopes.contains(IteratorScope.scan));
+    Assert.assertTrue(scopes.contains(IteratorScope.minc));
+    Assert.assertTrue(scopes.contains(IteratorScope.majc));
+
+    Iterable<Entry<String,String>> propIter = tops.getProperties(MetadataTable.NAME);
+    HashMap<String,String> properties = new HashMap<String,String>();
+    for (Entry<String,String> entry : propIter) {
+      properties.put(entry.getKey(), entry.getValue());
+    }
+
+    for (IteratorScope scope : scopes) {
+      String key = Property.TABLE_ITERATOR_PREFIX.getKey() + scope.name() + "." + ReplicationTableUtil.COMBINER_NAME + ".opt.columns";
+      Assert.assertTrue("Properties did not contain key : " + key, properties.containsKey(key));
+      Assert.assertEquals(MetadataSchema.ReplicationSection.COLF.toString(), properties.get(key));
+    }
+  }
+
+  @Test
   public void test() throws Exception {
     Connector conn = getConnector();
     if (conn.tableOperations().exists(ReplicationTable.NAME)) {