You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/10/31 22:27:04 UTC

[05/12] git commit: ACCUMULO-3213 Make DeleteTable archive files if configured.

ACCUMULO-3213 Make DeleteTable archive files if configured.

Archiving a table directory is a little more complicated since we
need to account for the potential existence of files for that table
which were already archived. Ultimately, we're merging the table directory
into the correct directory inside the fileArchive directory.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/43cee41a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/43cee41a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/43cee41a

Branch: refs/heads/1.6
Commit: 43cee41a0d816c1bc72141b4c47aaca43aba1eed
Parents: ded67f1
Author: Josh Elser <el...@apache.org>
Authored: Tue Oct 28 23:59:13 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Oct 31 14:59:47 2014 -0400

----------------------------------------------------------------------
 .../accumulo/master/tableOps/DeleteTable.java   | 156 ++++++++---
 .../org/apache/accumulo/test/FileArchiveIT.java | 273 +++++++++++++++++++
 2 files changed, 388 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/43cee41a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
index d264f14..f6db45b 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
@@ -17,8 +17,8 @@
 package org.apache.accumulo.master.tableOps;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.net.UnknownHostException;
+import java.util.Arrays;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.BatchScanner;
@@ -29,6 +29,8 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Range;
@@ -39,6 +41,7 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.volume.Volume;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.server.ServerConstants;
@@ -52,52 +55,53 @@ import org.apache.accumulo.server.security.AuditedSecurityOperation;
 import org.apache.accumulo.server.security.SystemCredentials;
 import org.apache.accumulo.server.tables.TableManager;
 import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 
 class CleanUp extends MasterRepo {
-  
+
   final private static Logger log = Logger.getLogger(CleanUp.class);
-  
+
   private static final long serialVersionUID = 1L;
-  
+
   private String tableId, namespaceId;
-  
+
   private long creationTime;
-  
+
   private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
     in.defaultReadObject();
-    
+
     /*
      * handle the case where we start executing on a new machine where the current time is in the past relative to the previous machine
-     * 
+     *
      * if the new machine has time in the future, that will work ok w/ hasCycled
      */
     if (System.currentTimeMillis() < creationTime) {
       creationTime = System.currentTimeMillis();
     }
-    
+
   }
-  
+
   public CleanUp(String tableId, String namespaceId) {
     this.tableId = tableId;
     this.namespaceId = namespaceId;
     creationTime = System.currentTimeMillis();
   }
-  
+
   @Override
   public long isReady(long tid, Master master) throws Exception {
     if (!master.hasCycled(creationTime)) {
       return 50;
     }
-    
+
     boolean done = true;
     Range tableRange = new KeyExtent(new Text(tableId), null, null).toMetadataRange();
     Scanner scanner = master.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
     MetaDataTableScanner.configureScanner(scanner, master);
     scanner.setRange(tableRange);
-    
+
     KeyExtent prevExtent = null;
     for (Entry<Key,Value> entry : scanner) {
       TabletLocationState locationState = MetaDataTableScanner.createTabletLocationState(entry.getKey(), entry.getValue());
@@ -107,7 +111,7 @@ class CleanUp extends MasterRepo {
         break;
       }
       prevExtent = locationState.extent;
-      
+
       TabletState state = locationState.getState(master.onlineTabletServers());
       if (state.equals(TabletState.ASSIGNED) || state.equals(TabletState.HOSTED)) {
         log.debug("Still waiting for table to be deleted: " + tableId + " locationState: " + locationState);
@@ -115,20 +119,20 @@ class CleanUp extends MasterRepo {
         break;
       }
     }
-    
+
     if (!done)
       return 50;
-    
+
     return 0;
   }
-  
+
   @Override
   public Repo<Master> call(long tid, Master master) throws Exception {
-    
+
     master.clearMigrations(tableId);
-    
+
     int refCount = 0;
-    
+
     try {
       // look for other tables that references this table's files
       Connector conn = master.getConnector();
@@ -143,7 +147,7 @@ class CleanUp extends MasterRepo {
         IteratorSetting cfg = new IteratorSetting(40, "grep", GrepIterator.class);
         GrepIterator.setTerm(cfg, "/" + tableId + "/");
         bs.addScanIterator(cfg);
-        
+
         for (Entry<Key,Value> entry : bs) {
           if (entry.getKey().getColumnQualifier().toString().contains("/" + tableId + "/")) {
             refCount++;
@@ -152,12 +156,12 @@ class CleanUp extends MasterRepo {
       } finally {
         bs.close();
       }
-      
+
     } catch (Exception e) {
       refCount = -1;
       log.error("Failed to scan " + MetadataTable.NAME + " looking for references to deleted table " + tableId, e);
     }
-    
+
     // remove metadata table entries
     try {
       // Intentionally do not pass master lock. If master loses lock, this operation may complete before master can kill itself.
@@ -167,20 +171,27 @@ class CleanUp extends MasterRepo {
     } catch (Exception e) {
       log.error("error deleting " + tableId + " from metadata table", e);
     }
-    
+
     // remove any problem reports the table may have
     try {
       ProblemReports.getInstance().deleteProblemReports(tableId);
     } catch (Exception e) {
       log.error("Failed to delete problem reports for table " + tableId, e);
     }
-    
+
     if (refCount == 0) {
+      final AccumuloConfiguration conf = master.getConfiguration().getConfiguration();
+      boolean archiveFiles = conf.getBoolean(Property.GC_FILE_ARCHIVE);
+
       // delete the map files
       try {
         VolumeManager fs = master.getFileSystem();
         for (String dir : ServerConstants.getTablesDirs()) {
-          fs.deleteRecursively(new Path(dir, tableId));
+          if (archiveFiles) {
+            archiveFile(fs, dir, tableId);
+          } else {
+            fs.deleteRecursively(new Path(dir, tableId));
+          }
         }
       } catch (IOException e) {
         log.error("Unable to remove deleted table directory", e);
@@ -193,7 +204,7 @@ class CleanUp extends MasterRepo {
         }
       }
     }
-    
+
     // remove table from zookeeper
     try {
       TableManager.getInstance().removeTable(tableId);
@@ -201,59 +212,122 @@ class CleanUp extends MasterRepo {
     } catch (Exception e) {
       log.error("Failed to find table id in zookeeper", e);
     }
-    
+
     // remove any permissions associated with this table
     try {
       AuditedSecurityOperation.getInstance().deleteTable(SystemCredentials.get().toThrift(master.getInstance()), tableId, namespaceId);
     } catch (ThriftSecurityException e) {
       log.error(e.getMessage(), e);
     }
-    
+
     Utils.unreserveTable(tableId, tid, true);
     Utils.unreserveNamespace(namespaceId, tid, false);
-    
+
     Logger.getLogger(CleanUp.class).debug("Deleted table " + tableId);
-    
+
     return null;
   }
-  
+
+  protected void archiveFile(VolumeManager fs, String dir, String tableId) throws IOException {
+    Path tableDirectory = new Path(dir, tableId);
+    Volume v = fs.getVolumeByPath(tableDirectory);
+    String basePath = v.getBasePath();
+
+    // Path component of URI
+    String tableDirPath = tableDirectory.toUri().getPath();
+
+    // Just the suffix of the path (after the Volume's base path)
+    String tableDirSuffix = tableDirPath.substring(basePath.length());
+
+    // Remove a leading path separator char because Path will treat the "child" as an absolute path with it
+    if (Path.SEPARATOR_CHAR == tableDirSuffix.charAt(0)) {
+      if (tableDirSuffix.length() > 1) {
+        tableDirSuffix = tableDirSuffix.substring(1);
+      } else {
+        tableDirSuffix = "";
+      }
+    }
+
+    // Get the file archive directory on this volume
+    final Path fileArchiveDir = new Path(basePath, ServerConstants.FILE_ARCHIVE_DIR);
+
+    // Make sure it exists just to be safe
+    fs.mkdirs(fileArchiveDir);
+
+    // The destination to archive this table to
+    final Path destTableDir = new Path(fileArchiveDir, tableDirSuffix);
+
+    log.debug("Archiving " + tableDirectory + " to " + tableDirectory);
+
+    if (fs.exists(destTableDir)) {
+      merge(fs, tableDirectory, destTableDir);
+    } else {
+      fs.rename(tableDirectory, destTableDir);
+    }
+  }
+
+  protected void merge(VolumeManager fs, Path src, Path dest) throws IOException {
+    for (FileStatus child : fs.listStatus(src)) {
+      final String childName = child.getPath().getName();
+      final Path childInSrc = new Path(src, childName), childInDest = new Path(dest, childName);
+
+      if (child.isFile()) {
+        if (fs.exists(childInDest)) {
+          log.warn("File already exists in archive, ignoring. " + childInDest);
+        } else {
+          fs.rename(childInSrc, childInDest);
+        }
+      } else if (child.isDirectory()) {
+        if (fs.exists(childInDest)) {
+          // Recurse
+          merge(fs, childInSrc, childInDest);
+        } else {
+          fs.rename(childInSrc, childInDest);
+        }
+      } else {
+        // Symlinks shouldn't exist in table directories..
+        log.warn("Ignoring archiving of non file/directory: " + child);
+      }
+    }
+  }
+
   @Override
   public void undo(long tid, Master environment) throws Exception {
     // nothing to do
   }
-  
+
 }
 
 public class DeleteTable extends MasterRepo {
-  
+
   private static final long serialVersionUID = 1L;
-  
+
   private String tableId, namespaceId;
-  
+
   public DeleteTable(String tableId) {
     this.tableId = tableId;
     Instance inst = HdfsZooInstance.getInstance();
     this.namespaceId = Tables.getNamespaceId(inst, tableId);
   }
-  
+
   @Override
   public long isReady(long tid, Master environment) throws Exception {
-    
+
     return Utils.reserveNamespace(namespaceId, tid, false, false, TableOperation.DELETE)
         + Utils.reserveTable(tableId, tid, true, true, TableOperation.DELETE);
   }
-  
+
   @Override
   public Repo<Master> call(long tid, Master environment) throws Exception {
     TableManager.getInstance().transitionTableState(tableId, TableState.DELETING);
     environment.getEventCoordinator().event("deleting table %s ", tableId);
     return new CleanUp(tableId, namespaceId);
   }
-  
+
   @Override
   public void undo(long tid, Master environment) throws Exception {
     Utils.unreserveNamespace(namespaceId, tid, false);
     Utils.unreserveTable(tableId, tid, true);
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/43cee41a/test/src/test/java/org/apache/accumulo/test/FileArchiveIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/FileArchiveIT.java b/test/src/test/java/org/apache/accumulo/test/FileArchiveIT.java
new file mode 100644
index 0000000..2e45d80
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/FileArchiveIT.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Tests that files are archived instead of deleted when configured.
+ */
+public class FileArchiveIT extends ConfigurableMacIT {
+
+  @Override
+  public int defaultTimeoutSeconds() {
+    return 2 * 60;
+  }
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
+    cfg.setProperty(Property.GC_FILE_ARCHIVE, "true");
+    cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
+    cfg.setProperty(Property.GC_CYCLE_START, "1s");
+  }
+
+  @Test
+  public void testUnusuedFilesAreArchived() throws Exception {
+    final Connector conn = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+
+    conn.tableOperations().create(tableName);
+
+    final String tableId = conn.tableOperations().tableIdMap().get(tableName);
+    Assert.assertNotNull("Could not get table ID", tableId);
+
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+    Mutation m = new Mutation("row");
+    m.put("", "", "value");
+    bw.addMutation(m);
+    bw.close();
+
+    // Compact memory to disk
+    conn.tableOperations().compact(tableName, null, null, true, true);
+
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(MetadataSchema.TabletsSection.getRange(tableId));
+    s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+
+    Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+    final String file = entry.getKey().getColumnQualifier().toString();
+    final Path p = new Path(file);
+
+    // Then force another to make an unreferenced file
+    conn.tableOperations().compact(tableName, null, null, true, true);
+
+    log.info("File for table: " + file);
+
+    FileSystem fs = LocalFileSystem.get(CachedConfiguration.getInstance());
+    int i = 0;
+    while (fs.exists(p)) {
+      i++;
+      Thread.sleep(1000);
+      if (0 == i % 10) {
+        log.info("Waited " + i + " iterations, file still exists");
+      }
+    }
+
+    log.info("File was removed");
+
+    String filePath = p.toUri().getPath().substring(getCluster().getConfig().getAccumuloDir().toString().length());
+
+    log.info("File relative to accumulo dir: " + filePath);
+
+    Path fileArchiveDir = new Path(getCluster().getConfig().getAccumuloDir().toString(), ServerConstants.FILE_ARCHIVE_DIR);
+
+    Assert.assertTrue("File archive directory didn't exist", fs.exists(fileArchiveDir));
+
+    // Remove the leading '/' to make sure Path treats the 2nd arg as a child.
+    Path archivedFile = new Path(fileArchiveDir, filePath.substring(1));
+
+    Assert.assertTrue("File doesn't exists in archive directory: " + archivedFile, fs.exists(archivedFile));
+  }
+
+  @Test
+  public void testDeletedTableIsArchived() throws Exception {
+    final Connector conn = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+
+    conn.tableOperations().create(tableName);
+
+    final String tableId = conn.tableOperations().tableIdMap().get(tableName);
+    Assert.assertNotNull("Could not get table ID", tableId);
+
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+    Mutation m = new Mutation("row");
+    m.put("", "", "value");
+    bw.addMutation(m);
+    bw.close();
+
+    // Compact memory to disk
+    conn.tableOperations().compact(tableName, null, null, true, true);
+
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(MetadataSchema.TabletsSection.getRange(tableId));
+    s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+
+    Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+    final String file = entry.getKey().getColumnQualifier().toString();
+    final Path p = new Path(file);
+
+    conn.tableOperations().delete(tableName);
+
+    log.info("File for table: " + file);
+
+    FileSystem fs = LocalFileSystem.get(CachedConfiguration.getInstance());
+    int i = 0;
+    while (fs.exists(p)) {
+      i++;
+      Thread.sleep(1000);
+      if (0 == i % 10) {
+        log.info("Waited " + i + " iterations, file still exists");
+      }
+    }
+
+    log.info("File was removed");
+
+    String filePath = p.toUri().getPath().substring(getCluster().getConfig().getAccumuloDir().toString().length());
+
+    log.info("File relative to accumulo dir: " + filePath);
+
+    Path fileArchiveDir = new Path(getCluster().getConfig().getAccumuloDir().toString(), ServerConstants.FILE_ARCHIVE_DIR);
+
+    Assert.assertTrue("File archive directory didn't exist", fs.exists(fileArchiveDir));
+
+    // Remove the leading '/' to make sure Path treats the 2nd arg as a child.
+    Path archivedFile = new Path(fileArchiveDir, filePath.substring(1));
+
+    Assert.assertTrue("File doesn't exists in archive directory: " + archivedFile, fs.exists(archivedFile));
+  }
+
+  @Test
+  public void testUnusuedFilesAndDeletedTable() throws Exception {
+    final Connector conn = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+
+    conn.tableOperations().create(tableName);
+
+    final String tableId = conn.tableOperations().tableIdMap().get(tableName);
+    Assert.assertNotNull("Could not get table ID", tableId);
+
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+    Mutation m = new Mutation("row");
+    m.put("", "", "value");
+    bw.addMutation(m);
+    bw.close();
+
+    // Compact memory to disk
+    conn.tableOperations().compact(tableName, null, null, true, true);
+
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(MetadataSchema.TabletsSection.getRange(tableId));
+    s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+
+    Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+    final String file = entry.getKey().getColumnQualifier().toString();
+    final Path p = new Path(file);
+
+    // Then force another to make an unreferenced file
+    conn.tableOperations().compact(tableName, null, null, true, true);
+
+    log.info("File for table: " + file);
+
+    FileSystem fs = LocalFileSystem.get(CachedConfiguration.getInstance());
+    int i = 0;
+    while (fs.exists(p)) {
+      i++;
+      Thread.sleep(1000);
+      if (0 == i % 10) {
+        log.info("Waited " + i + " iterations, file still exists");
+      }
+    }
+
+    log.info("File was removed");
+
+    String filePath = p.toUri().getPath().substring(getCluster().getConfig().getAccumuloDir().toString().length());
+
+    log.info("File relative to accumulo dir: " + filePath);
+
+    Path fileArchiveDir = new Path(getCluster().getConfig().getAccumuloDir().toString(), ServerConstants.FILE_ARCHIVE_DIR);
+
+    Assert.assertTrue("File archive directory didn't exist", fs.exists(fileArchiveDir));
+
+    // Remove the leading '/' to make sure Path treats the 2nd arg as a child.
+    Path archivedFile = new Path(fileArchiveDir, filePath.substring(1));
+
+    Assert.assertTrue("File doesn't exists in archive directory: " + archivedFile, fs.exists(archivedFile));
+
+    // Offline the table so we can be sure there is a single file
+    conn.tableOperations().offline(tableName, true);
+
+    // See that the file in metadata currently is
+    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(MetadataSchema.TabletsSection.getRange(tableId));
+    s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+
+    entry = Iterables.getOnlyElement(s);
+    final String finalFile = entry.getKey().getColumnQualifier().toString();
+    final Path finalPath = new Path(finalFile);
+
+    conn.tableOperations().delete(tableName);
+
+    log.info("File for table: " + finalPath);
+
+    i = 0;
+    while (fs.exists(finalPath)) {
+      i++;
+      Thread.sleep(1000);
+      if (0 == i % 10) {
+        log.info("Waited " + i + " iterations, file still exists");
+      }
+    }
+
+    log.info("File was removed");
+
+    String finalFilePath = finalPath.toUri().getPath().substring(getCluster().getConfig().getAccumuloDir().toString().length());
+
+    log.info("File relative to accumulo dir: " + finalFilePath);
+
+    Assert.assertTrue("File archive directory didn't exist", fs.exists(fileArchiveDir));
+
+    // Remove the leading '/' to make sure Path treats the 2nd arg as a child.
+    Path finalArchivedFile = new Path(fileArchiveDir, finalFilePath.substring(1));
+
+    Assert.assertTrue("File doesn't exists in archive directory: " + finalArchivedFile, fs.exists(finalArchivedFile));
+  }
+
+}