You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/03/21 03:13:19 UTC

[16/25] git commit: ACCUMULO-2061 Use URI instead of FileSystem as the key to find correct Volumes and ensure that absolute URIs are still valid even after they are not configured.

ACCUMULO-2061 Use URI instead of FileSystem as the key to find correct Volumes and ensure that absolute URIs
are still valid even after they are not configured.

This will help ensure that FileSystem implementations' hashCode and equals don't
have the potential to collide but still provide unique access back to the Volumes
contained in the FileSystem. Added tests for the NonConfiguredVolume and also
for the no-longer-configured volumes.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/42f6b58e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/42f6b58e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/42f6b58e

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 42f6b58e58a8fb1a8f05c371844f86c536fe143a
Parents: 492768d
Author: Josh Elser <el...@apache.org>
Authored: Fri Mar 14 17:06:32 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Mar 20 18:59:30 2014 -0400

----------------------------------------------------------------------
 .../core/volume/NonConfiguredVolume.java        | 92 ++++++++++++++++++++
 .../core/volume/NonConfiguredVolumeTest.java    | 71 +++++++++++++++
 .../accumulo/server/fs/VolumeManagerImpl.java   | 25 +++---
 .../java/org/apache/accumulo/test/VolumeIT.java | 55 +++++++++++-
 4 files changed, 231 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/42f6b58e/core/src/main/java/org/apache/accumulo/core/volume/NonConfiguredVolume.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/volume/NonConfiguredVolume.java b/core/src/main/java/org/apache/accumulo/core/volume/NonConfiguredVolume.java
new file mode 100644
index 0000000..7dcbd88
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/volume/NonConfiguredVolume.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.volume;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Volume implementation which represents a Volume for which we have a FileSystem
+ * but no base path because it is not configured via {@link Property#INSTANCE_VOLUMES}
+ * 
+ * This is useful to handle volumes that have been removed from accumulo-site.xml but references
+ * to these volumes have not been updated. This Volume should never be used to create new files,
+ * only to read existing files.
+ */
+public class NonConfiguredVolume implements Volume {
+  private static final Logger log = Logger.getLogger(NonConfiguredVolume.class);
+
+  private FileSystem fs;
+
+  public NonConfiguredVolume(FileSystem fs) {
+    this.fs = fs;
+  }
+
+  @Override
+  public FileSystem getFileSystem() {
+    return fs;
+  }
+
+  @Override
+  public String getBasePath() {
+    throw new UnsupportedOperationException("No base path known because this volume isn't configured in accumulo-site.xml");
+  }
+
+  @Override
+  public Path prefixChild(Path p) {
+    throw new UnsupportedOperationException("Cannot prefix path because this volume isn't configured in accumulo-site.xml");
+  }
+
+  @Override
+  public Path prefixChild(String p) {
+    throw new UnsupportedOperationException("Cannot prefix path because this volume isn't configured in accumulo-site.xml");
+  }
+
+  @Override
+  public boolean isValidPath(Path p) {
+    try {
+      return fs.equals(p.getFileSystem(CachedConfiguration.getInstance()));
+    } catch (IOException e) {
+      log.debug("Cannot determine FileSystem from path: " + p, e);
+    }
+    return false;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof NonConfiguredVolume) {
+      NonConfiguredVolume other = (NonConfiguredVolume) o;
+      return this.fs.equals(other.getFileSystem());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return "NonConfiguredVolume: " + this.fs.toString();
+  }
+
+  @Override
+  public int hashCode() {
+    return NonConfiguredVolume.class.hashCode() ^ this.fs.hashCode();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/42f6b58e/core/src/test/java/org/apache/accumulo/core/volume/NonConfiguredVolumeTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/volume/NonConfiguredVolumeTest.java b/core/src/test/java/org/apache/accumulo/core/volume/NonConfiguredVolumeTest.java
new file mode 100644
index 0000000..937baf8
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/volume/NonConfiguredVolumeTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.volume;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class NonConfiguredVolumeTest {
+
+  private NonConfiguredVolume volume;
+
+  @Before
+  public void create() throws IOException {
+    volume = new NonConfiguredVolume(FileSystem.getLocal(new Configuration()));
+  }
+
+  @Test
+  public void testSameFileSystem() throws IOException {
+    Assert.assertEquals(FileSystem.getLocal(new Configuration()), volume.getFileSystem());
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testGetBasePathFails() {
+    volume.getBasePath();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testPrefixChildPath() {
+    volume.prefixChild(new Path("/foo"));
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testPrefixChildString() {
+    volume.prefixChild("/foo");
+  }
+
+  @Test
+  public void testEquality() throws IOException {
+    Volume newVolume = new NonConfiguredVolume(FileSystem.getLocal(new Configuration()));
+    Assert.assertEquals(volume, newVolume);
+  }
+
+  @Test
+  public void testHashCode() throws IOException {
+    Volume newVolume = new NonConfiguredVolume(FileSystem.getLocal(new Configuration()));
+    Assert.assertEquals(volume.hashCode(), newVolume.hashCode());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/42f6b58e/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
index b860f53..64a6390 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.volume.NonConfiguredVolume;
 import org.apache.accumulo.core.volume.Volume;
 import org.apache.accumulo.core.volume.VolumeConfiguration;
 import org.apache.accumulo.server.client.HdfsZooInstance;
@@ -65,7 +66,7 @@ public class VolumeManagerImpl implements VolumeManager {
   private static final Logger log = Logger.getLogger(VolumeManagerImpl.class);
 
   Map<String,Volume> volumesByName;
-  Multimap<FileSystem,Volume> volumesByFileSystem;
+  Multimap<URI,Volume> volumesByFileSystemUri;
   Volume defaultVolume;
   AccumuloConfiguration conf;
   VolumeChooser chooser;
@@ -74,16 +75,16 @@ public class VolumeManagerImpl implements VolumeManager {
     this.volumesByName = volumes;
     this.defaultVolume = defaultVolume;
     // We may have multiple directories used in a single FileSystem (e.g. testing)
-    this.volumesByFileSystem = HashMultimap.create();
-    invertVolumesByFileSystem(volumesByName, volumesByFileSystem);
+    this.volumesByFileSystemUri = HashMultimap.create();
+    invertVolumesByFileSystem(volumesByName, volumesByFileSystemUri);
     this.conf = conf;
     ensureSyncIsEnabled();
     chooser = Property.createInstanceFromPropertyName(conf, Property.GENERAL_VOLUME_CHOOSER, VolumeChooser.class, new RandomVolumeChooser());
   }
 
-  private void invertVolumesByFileSystem(Map<String,Volume> forward, Multimap<FileSystem,Volume> inverted) {
+  private void invertVolumesByFileSystem(Map<String,Volume> forward, Multimap<URI,Volume> inverted) {
     for (Volume volume : forward.values()) {
-      inverted.put(volume.getFileSystem(), volume);
+      inverted.put(volume.getFileSystem().getUri(), volume);
     }
   }
 
@@ -299,8 +300,9 @@ public class VolumeManagerImpl implements VolumeManager {
   public Volume getVolumeByPath(Path path) {
     if (path.toString().contains(":")) {
       try {
-        FileSystem pathFs = path.getFileSystem(CachedConfiguration.getInstance());
-        Collection<Volume> candidateVolumes = volumesByFileSystem.get(pathFs);
+        FileSystem desiredFs = path.getFileSystem(CachedConfiguration.getInstance());
+        URI desiredFsUri = desiredFs.getUri();
+        Collection<Volume> candidateVolumes = volumesByFileSystemUri.get(desiredFsUri);
         if (null != candidateVolumes) {
           for (Volume candidateVolume : candidateVolumes) {
             if (candidateVolume.isValidPath(path)) {
@@ -309,11 +311,12 @@ public class VolumeManagerImpl implements VolumeManager {
           }
 
           // For the same reason as we can have multiple Volumes within a single filesystem
-          // we could also not find a matching one. We should defer back to the defaultVolume
-          // e.g. volume rename with old path references
-          log.debug("Defaulting to " + defaultVolume + " as a valid volume could not be determined for " + path);
+          // we could also not find a matching one. We should still provide a Volume with the
+          // correct FileSystem even though we don't know what the proper base dir is
+          // e.g. Files on volumes that are now removed
+          log.debug("Found no configured Volume for the given path: " + path);
 
-          return defaultVolume;
+          return new NonConfiguredVolume(desiredFs);
         }
 
         log.debug("Could not determine volume for Path '" + path + "' from defined volumes");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/42f6b58e/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
index a7f7556..6c1da2c 100644
--- a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
@@ -307,6 +307,59 @@ public class VolumeIT extends ConfigurableMacIT {
 
   }
 
+  @Test
+  public void testNonConfiguredVolumes() throws Exception {
+
+    String[] tableNames = getTableNames(2);
+
+    // grab this before shutting down cluster
+    String uuid = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers()).getInstanceID();
+
+    verifyVolumesUsed(tableNames[0], false, v1, v2);
+
+    Assert.assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+    cluster.stop();
+
+    Configuration conf = new Configuration(false);
+    conf.addResource(new Path(cluster.getConfig().getConfDir().toURI().toString(), "accumulo-site.xml"));
+
+    File v3f = new File(volDirBase, "v3");
+    v3f.mkdir();
+    Path v3 = new Path("file://" + v3f.getAbsolutePath());
+
+    conf.set(Property.INSTANCE_VOLUMES.getKey(), v2.toString() + "," + v3.toString());
+    BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "accumulo-site.xml")));
+    conf.writeXml(fos);
+    fos.close();
+
+    // initialize volume
+    Assert.assertEquals(0, cluster.exec(Initialize.class, "--add-volumes").waitFor());
+
+    // check that all volumes are initialized
+    for (Path volumePath : Arrays.asList(v1, v2, v3)) {
+      FileSystem fs = volumePath.getFileSystem(CachedConfiguration.getInstance());
+      Path vp = new Path(volumePath, ServerConstants.INSTANCE_ID_DIR);
+      FileStatus[] iids = fs.listStatus(vp);
+      Assert.assertEquals(1, iids.length);
+      Assert.assertEquals(uuid, iids[0].getPath().getName());
+    }
+
+    // start cluster and verify that new volume is used
+    cluster.start();
+
+    // Make sure we can still read the tables (tableNames[0] is very likely to have a file still on v1)
+    List<String> expected = new ArrayList<String>();
+    for (int i = 0; i < 100; i++) {
+      String row = String.format("%06d", i * 100 + 3);
+      expected.add(row + ":cf1:cq1:1");
+    }
+
+    verifyData(expected, getConnector().createScanner(tableNames[0], Authorizations.EMPTY));
+
+    // v1 should not have any data for tableNames[1]
+    verifyVolumesUsed(tableNames[1], false, v2, v3);
+  }
+
   private void writeData(String tableName, Connector conn) throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException,
       MutationsRejectedException {
     TreeSet<Text> splits = new TreeSet<Text>();
@@ -331,7 +384,7 @@ public class VolumeIT extends ConfigurableMacIT {
   private void verifyVolumesUsed(String tableName, boolean shouldExist, Path... paths) throws AccumuloException, AccumuloSecurityException,
       TableExistsException, TableNotFoundException, MutationsRejectedException {
 
-    Connector conn = cluster.getConnector("root", ROOT_PASSWORD);
+    Connector conn = getConnector();
 
     List<String> expected = new ArrayList<String>();
     for (int i = 0; i < 100; i++) {