You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2014/01/15 21:43:23 UTC

git commit: ACCUMULO-2195 fixed bulk import w/ viewfs

Updated Branches:
  refs/heads/1.6.0-SNAPSHOT 1496c5f32 -> 82437b1db


ACCUMULO-2195 fixed bulk import w/ viewfs


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/82437b1d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/82437b1d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/82437b1d

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 82437b1dbf070cddb11d410a33c0154f1d700c05
Parents: 1496c5f
Author: Keith Turner <kt...@apache.org>
Authored: Wed Jan 15 15:40:15 2014 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Wed Jan 15 15:48:07 2014 -0500

----------------------------------------------------------------------
 .../apache/accumulo/server/fs/ViewFSUtils.java  |  59 ++++++++++-
 .../accumulo/server/fs/VolumeManagerImpl.java   |   8 ++
 .../accumulo/server/util/MetadataTableUtil.java |  10 +-
 .../accumulo/server/fs/ViewFSUtilsTest.java     | 100 +++++++++++++++++++
 .../accumulo/master/tableOps/BulkImport.java    |   2 +-
 5 files changed, 168 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/82437b1d/server/base/src/main/java/org/apache/accumulo/server/fs/ViewFSUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/ViewFSUtils.java b/server/base/src/main/java/org/apache/accumulo/server/fs/ViewFSUtils.java
index ae7a8ae..34912f3 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/ViewFSUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/ViewFSUtils.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -27,8 +28,24 @@ import org.apache.hadoop.fs.Path;
  * 
  */
 public class ViewFSUtils {
+
+  public static final String VIEWFS_CLASSNAME = "org.apache.hadoop.fs.viewfs.ViewFileSystem";
+
+  public static boolean isViewFSSupported() {
+    try {
+      Class.forName(VIEWFS_CLASSNAME);
+      return true;
+    } catch (ClassNotFoundException e) {
+      return false;
+    }
+  }
+
+  public static boolean isViewFS(Path source, Configuration conf) throws IOException {
+    return isViewFS(source.getFileSystem(conf));
+  }
+
   public static boolean isViewFS(FileSystem fs) {
-    return fs.getClass().getName().equals("org.apache.hadoop.fs.viewfs.ViewFileSystem");
+    return fs.getClass().getName().equals(VIEWFS_CLASSNAME);
   }
 
   public static Path resolvePath(FileSystem fs, Path path) throws IOException {
@@ -48,4 +65,44 @@ public class ViewFSUtils {
       throw new IOException(e);
     }
   }
+
+  public static Path matchingFileSystem(Path source, String[] options, Configuration conf) throws IOException {
+
+    if (!isViewFS(source, conf))
+      throw new IllegalArgumentException("source " + source + " is not view fs");
+
+    String sourceUriPath = source.toUri().getPath();
+
+    Path match = null;
+    int matchPrefixLen = 0;
+
+    // find the option with the longest commmon path prefix
+    for (String option : options) {
+      Path optionPath = new Path(option);
+      if (isViewFS(optionPath, conf)) {
+        String optionUriPath = optionPath.toUri().getPath();
+
+        int commonPrefixLen = 0;
+        for (int i = 0; i < Math.min(sourceUriPath.length(), optionUriPath.length()); i++) {
+          if (sourceUriPath.charAt(i) == optionUriPath.charAt(i)) {
+            if (sourceUriPath.charAt(i) == '/')
+              commonPrefixLen++;
+          } else {
+            break;
+          }
+        }
+
+        if (commonPrefixLen > matchPrefixLen) {
+          matchPrefixLen = commonPrefixLen;
+          match = optionPath;
+        } else if (match != null && commonPrefixLen == matchPrefixLen && optionPath.depth() < match.depth()) {
+          // take path with less depth when match perfix length is the same
+          match = optionPath;
+        }
+      }
+    }
+
+    return match;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/82437b1d/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
index 3e9cc26..eb7a330 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@ -439,6 +439,14 @@ public class VolumeManagerImpl implements VolumeManager {
 
   @Override
   public Path matchingFileSystem(Path source, String[] options) {
+    try {
+      if (ViewFSUtils.isViewFS(source, CachedConfiguration.getInstance())) {
+        return ViewFSUtils.matchingFileSystem(source, options, CachedConfiguration.getInstance());
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
     URI uri1 = source.toUri();
     for (String option : options) {
       URI uri3 = URI.create(option);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/82437b1d/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index 26444e0..8a2fe3b 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -77,7 +77,6 @@ import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.security.SystemCredentials;
 import org.apache.accumulo.server.zookeeper.ZooLock;
@@ -271,14 +270,7 @@ public class MetadataTableUtil {
   }
 
   public static Mutation createDeleteMutation(String tableId, String pathToRemove) throws IOException {
-    if (!pathToRemove.contains(":")) {
-      if (pathToRemove.startsWith("../"))
-        pathToRemove = pathToRemove.substring(2);
-      else
-        pathToRemove = "/" + tableId + pathToRemove;
-    }
-
-    Path path = VolumeManagerImpl.get().getFullPath(FileType.TABLE, pathToRemove);
+    Path path = VolumeManagerImpl.get().getFullPath(tableId, pathToRemove);
     Mutation delFlag = new Mutation(new Text(MetadataSchema.DeletesSection.getRowPrefix() + path.toString()));
     delFlag.put(EMPTY_TEXT, EMPTY_TEXT, new Value(new byte[] {}));
     return delFlag;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/82437b1d/server/base/src/test/java/org/apache/accumulo/server/fs/ViewFSUtilsTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/fs/ViewFSUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/fs/ViewFSUtilsTest.java
new file mode 100644
index 0000000..52c70c0
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/fs/ViewFSUtilsTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class ViewFSUtilsTest {
+  
+  private String[] shuffle(String ... inputs){
+    // code below will modify array
+    Collections.shuffle(Arrays.asList(inputs));
+    return inputs;
+  }
+  
+  @Test
+  public void testDisjointMountPoints() throws IllegalArgumentException, IOException {
+    if (ViewFSUtils.isViewFSSupported()) {
+      Configuration conf = new Configuration(false);
+      conf.set("fs.viewfs.mounttable.default.link./ns", "file:///tmp/ns");
+      conf.set("fs.viewfs.mounttable.default.link./ns1", "file:///tmp/ns1");
+      conf.set("fs.viewfs.mounttable.default.link./ns2", "file:///tmp/ns2");
+      conf.set("fs.viewfs.mounttable.default.link./ns22", "file:///tmp/ns22");
+
+      String[] tablesDirs1 = shuffle("viewfs:///ns1/accumulo/tables", "viewfs:///ns2/accumulo/tables", "viewfs:///ns22/accumulo/tables",
+          "viewfs:///ns/accumulo/tables");
+      String[] tablesDirs2 = shuffle("viewfs:/ns1/accumulo/tables", "viewfs:/ns2/accumulo/tables", "viewfs:/ns22/accumulo/tables",
+          "viewfs:/ns/accumulo/tables");
+
+      for (String ns : Arrays.asList("ns1", "ns2", "ns22", "ns")) {
+        Path match = ViewFSUtils.matchingFileSystem(new Path("viewfs:/" + ns + "/bulk_import_01"), tablesDirs2, conf);
+        Assert.assertEquals(new Path("viewfs:/" + ns + "/accumulo/tables"), match);
+
+        match = ViewFSUtils.matchingFileSystem(new Path("viewfs:///" + ns + "/bulk_import_01"), tablesDirs1, conf);
+        Assert.assertEquals(new Path("viewfs:/" + ns + "/accumulo/tables"), match);
+
+        match = ViewFSUtils.matchingFileSystem(new Path("viewfs:/" + ns + "/bulk_import_01"), tablesDirs2, conf);
+        Assert.assertEquals(new Path("viewfs:/" + ns + "/accumulo/tables"), match);
+
+        match = ViewFSUtils.matchingFileSystem(new Path("viewfs:///" + ns + "/bulk_import_01"), tablesDirs1, conf);
+        Assert.assertEquals(new Path("viewfs:/" + ns + "/accumulo/tables"), match);
+      }
+    }
+  }
+
+  @Test
+  public void testOverlappingMountPoints() throws IllegalArgumentException, IOException {
+    if (ViewFSUtils.isViewFSSupported()) {
+      Configuration conf = new Configuration(false);
+      conf.set("fs.viewfs.mounttable.default.link./", "file:///tmp/0");
+      conf.set("fs.viewfs.mounttable.default.link./ns1", "file:///tmp/1");
+      conf.set("fs.viewfs.mounttable.default.link./ns1/A", "file:///tmp/2");
+      conf.set("fs.viewfs.mounttable.default.link./ns1/AA", "file:///tmp/3");
+      conf.set("fs.viewfs.mounttable.default.link./ns1/C", "file:///tmp/3");
+      conf.set("fs.viewfs.mounttable.default.link./ns2", "file:///tmp/3");
+
+      String[] tablesDirs1 = shuffle("viewfs:///ns1/accumulo/tables", "viewfs:///ns1/A/accumulo/tables", "viewfs:///ns1/AA/accumulo/tables",
+          "viewfs:///ns1/C/accumulo/tables", "viewfs:///ns2/accumulo/tables", "viewfs:///accumulo/tables");
+      String[] tablesDirs2 = shuffle("viewfs:/ns1/accumulo/tables", "viewfs:/ns1/A/accumulo/tables", "viewfs:/ns1/AA/accumulo/tables",
+          "viewfs:/ns1/C/accumulo/tables", "viewfs:/ns2/accumulo/tables", "viewfs:/accumulo/tables");
+
+      for (String ns : Arrays.asList("", "/ns1", "/ns1/A", "/ns1/AA", "/ns1/C", "/ns2")) {
+        Path match = ViewFSUtils.matchingFileSystem(new Path("viewfs:" + ns + "/bulk_import_01"), tablesDirs2, conf);
+        Assert.assertEquals(new Path("viewfs:" + ns + "/accumulo/tables"), match);
+
+        match = ViewFSUtils.matchingFileSystem(new Path("viewfs://" + ns + "/bulk_import_01"), tablesDirs1, conf);
+        Assert.assertEquals(new Path("viewfs:" + ns + "/accumulo/tables"), match);
+
+        match = ViewFSUtils.matchingFileSystem(new Path("viewfs:" + ns + "/bulk_import_01"), tablesDirs2, conf);
+        Assert.assertEquals(new Path("viewfs:" + ns + "/accumulo/tables"), match);
+
+        match = ViewFSUtils.matchingFileSystem(new Path("viewfs://" + ns + "/bulk_import_01"), tablesDirs1, conf);
+        Assert.assertEquals(new Path("viewfs:" + ns + "/accumulo/tables"), match);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/82437b1d/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
index 5b85a14..1388c70 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@ -305,7 +305,7 @@ class CleanUpBulkImport extends MasterRepo {
     log.debug("removing the bulk processing flag file in " + bulk);
     Path bulkDir = new Path(bulk);
     MetadataTableUtil.removeBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
-    MetadataTableUtil.addDeleteEntry(tableId, "/" + bulkDir.getName());
+    MetadataTableUtil.addDeleteEntry(tableId, bulkDir.toString());
     log.debug("removing the metadata table markers for loaded files");
     Connector conn = master.getConnector();
     MetadataTableUtil.removeBulkLoadEntries(conn, tableId, tid);