You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2014/01/24 06:02:24 UTC
git commit: ACCUMULO-1832 require all volumes to have same instance
id and version
Updated Branches:
refs/heads/1.6.0-SNAPSHOT 40ef5d426 -> 11d803cf1
ACCUMULO-1832 require all volumes to have same instance id and version
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/11d803cf
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/11d803cf
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/11d803cf
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 11d803cf11a76785c36e0f62989ae0d326c97b2b
Parents: 40ef5d4
Author: Keith Turner <kt...@apache.org>
Authored: Fri Jan 24 00:01:56 2014 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Fri Jan 24 00:01:56 2014 -0500
----------------------------------------------------------------------
.../org/apache/accumulo/core/conf/Property.java | 5 +-
.../impl/MiniAccumuloConfigImpl.java | 2 +-
.../org/apache/accumulo/server/Accumulo.java | 10 +-
.../apache/accumulo/server/ServerConstants.java | 98 +++++++++++----
.../accumulo/server/fs/VolumeManager.java | 3 -
.../accumulo/server/fs/VolumeManagerImpl.java | 13 +-
.../apache/accumulo/server/init/Initialize.java | 82 ++++++++++---
.../accumulo/server/ServerConstantsTest.java | 113 +++++++++++++++++
.../accumulo/server/init/InitializeTest.java | 1 +
.../server/security/SystemCredentialsTest.java | 9 +-
.../tserver/TabletServerSyncCheckTest.java | 5 -
.../accumulo/tserver/log/MultiReaderTest.java | 2 +-
.../java/org/apache/accumulo/test/VolumeIT.java | 123 +++++++++++++++++++
13 files changed, 407 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 6c4b1fe..bc4c7e1 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -118,7 +118,10 @@ public enum Property {
+ " Change it before initialization. To change it later use ./bin/accumulo accumulo.server.util.ChangeSecret [oldpasswd] [newpasswd], "
+ " and then update conf/accumulo-site.xml everywhere."),
INSTANCE_VOLUMES("instance.volumes", "", PropertyType.STRING,
- "A comma seperated list of dfs uris to use. Files will be stored across these filesystems. If this is empty, then instance.dfs.uri will be used."),
+ "A comma seperated list of dfs uris to use. Files will be stored across these filesystems. If this is empty, then instance.dfs.uri will be used. "
+ + "After adding uris to this list, run 'accumulo init --add-volume' and then restart tservers. If entries are removed from this list then tservers "
+ + "will need to be restarted. After a uri is removed from the list Accumulo will not create new files in that location, however Accumulo can still "
+ + "reference files created at that location before the config change."),
INSTANCE_SECURITY_AUTHENTICATOR("instance.security.authenticator", "org.apache.accumulo.server.security.handler.ZKAuthenticator", PropertyType.CLASSNAME,
"The authenticator class that accumulo will use to determine if a user has privilege to perform an action"),
INSTANCE_SECURITY_AUTHORIZOR("instance.security.authorizor", "org.apache.accumulo.server.security.handler.ZKAuthorizor", PropertyType.CLASSNAME,
http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
index 2931aca..feb7020 100644
--- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
@@ -263,7 +263,7 @@ public class MiniAccumuloConfigImpl {
return libDir;
}
- File getConfDir() {
+ public File getConfDir() {
return confDir;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
index 15e157d..bd78929 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.util.time.SimpleTimer;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -63,10 +64,10 @@ public class Accumulo {
}
}
- public static synchronized int getAccumuloPersistentVersion(VolumeManager fs) {
+ public static synchronized int getAccumuloPersistentVersion(FileSystem fs, Path path) {
int dataVersion;
try {
- FileStatus[] files = fs.getDefaultVolume().listStatus(ServerConstants.getDataVersionLocation());
+ FileStatus[] files = fs.listStatus(path);
if (files == null || files.length == 0) {
dataVersion = -1; // assume it is 0.5 or earlier
} else {
@@ -78,6 +79,11 @@ public class Accumulo {
}
}
+ public static synchronized int getAccumuloPersistentVersion(VolumeManager fs) {
+ Path path = ServerConstants.getDataVersionLocation();
+ return getAccumuloPersistentVersion(fs.getFileSystemByPath(path), path);
+ }
+
public static void enableTracing(String address, String application) {
try {
DistributedTrace.enable(HdfsZooInstance.getInstance(), ZooReaderWriter.getInstance(), application, address);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java
index 4c074d7..9d490e4 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java
@@ -17,11 +17,12 @@
package org.apache.accumulo.server;
import java.io.IOException;
+import java.util.ArrayList;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -29,6 +30,10 @@ import org.apache.hadoop.fs.Path;
public class ServerConstants {
+ public static final String VERSION_DIR = "version";
+
+ public static final String INSTANCE_ID_DIR = "instance_id";
+
/**
* current version (3) reflects additional namespace operations (ACCUMULO-802) in version 1.6.0 <br />
* (versions should never be negative)
@@ -70,27 +75,79 @@ public class ServerConstants {
return defaultBaseDir;
}
+ public static String[] getConfiguredBaseDirs() {
+ String singleNamespace = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR);
+ String ns = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_VOLUMES);
+
+ String configuredBaseDirs[];
+
+ if (ns == null || ns.isEmpty()) {
+ configuredBaseDirs = new String[] {getDefaultBaseDir()};
+ } else {
+ String namespaces[] = ns.split(",");
+ for (String namespace : namespaces) {
+ if (!namespace.contains(":")) {
+ throw new IllegalArgumentException("Expected fully qualified URI for " + Property.INSTANCE_VOLUMES.getKey() + " got " + namespace);
+ }
+ }
+ configuredBaseDirs = prefix(namespaces, singleNamespace);
+ }
+
+ return configuredBaseDirs;
+ }
+
// these are functions to delay loading the Accumulo configuration unless we must
public static synchronized String[] getBaseDirs() {
if (baseDirs == null) {
- String singleNamespace = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR);
- String ns = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_VOLUMES);
-
- if (ns == null || ns.isEmpty()) {
- baseDirs = new String[] {getDefaultBaseDir()};
- } else {
- String namespaces[] = ns.split(",");
- for (String namespace : namespaces) {
- if (!namespace.contains(":")) {
- throw new IllegalArgumentException("Expected fully qualified URI for " + Property.INSTANCE_VOLUMES.getKey() + " got " + namespace);
- }
- }
- baseDirs = prefix(namespaces, singleNamespace);
- }
+ baseDirs = checkBaseDirs(getConfiguredBaseDirs(), false);
}
+
return baseDirs;
}
+
+ public static String[] checkBaseDirs(String[] configuredBaseDirs, boolean ignore) {
+ // all base dirs must have same instance id and data version, any dirs that have neither should be ignored
+ String firstDir = null;
+ String firstIid = null;
+ Integer firstVersion = null;
+ ArrayList<String> baseDirsList = new ArrayList<String>();
+ for (String baseDir : configuredBaseDirs) {
+ Path path = new Path(baseDir, INSTANCE_ID_DIR);
+ String currentIid;
+ Integer currentVersion;
+ try {
+ currentIid = ZooUtil.getInstanceIDFromHdfs(new Path(baseDir, INSTANCE_ID_DIR));
+ Path vpath = new Path(baseDir, VERSION_DIR);
+ currentVersion = Accumulo.getAccumuloPersistentVersion(vpath.getFileSystem(CachedConfiguration.getInstance()), vpath);
+ } catch (Exception e) {
+ if (ignore)
+ continue;
+ else
+ throw new IllegalArgumentException("Accumulo volume " + path + " not initialized", e);
+ }
+
+ if (firstIid == null) {
+ firstIid = currentIid;
+ firstDir = baseDir;
+ firstVersion = currentVersion;
+ } else if (!currentIid.equals(firstIid)) {
+ throw new IllegalArgumentException("Configuration " + Property.INSTANCE_VOLUMES.getKey() + " contains paths that have different instance ids "
+ + baseDir + " has " + currentIid + " and " + firstDir + " has " + firstIid);
+ } else if (!currentVersion.equals(firstVersion)) {
+ throw new IllegalArgumentException("Configuration " + Property.INSTANCE_VOLUMES.getKey() + " contains paths that have different versions " + baseDir
+ + " has " + currentVersion + " and " + firstDir + " has " + firstVersion);
+ }
+
+ baseDirsList.add(baseDir);
+ }
+
+ if (baseDirsList.size() == 0) {
+ throw new RuntimeException("None of the configured paths are initialized.");
+ }
+
+ return baseDirsList.toArray(new String[baseDirsList.size()]);
+ }
public static String[] prefix(String bases[], String suffix) {
if (suffix.startsWith("/"))
@@ -123,17 +180,16 @@ public class ServerConstants {
}
public static Path getInstanceIdLocation() {
- return new Path(getBaseDirs()[0], "instance_id");
+ // all base dirs should have the same instance id, so can choose any one
+ return new Path(getBaseDirs()[0], INSTANCE_ID_DIR);
}
public static Path getDataVersionLocation() {
- return new Path(getBaseDirs()[0], "version");
- }
-
- public static String[] getRootTableDirs() {
- return prefix(getTablesDirs(), RootTable.ID);
+ // all base dirs should have the same version, so can choose any one
+ return new Path(getBaseDirs()[0], VERSION_DIR);
}
+
public static String[] getMetadataTableDirs() {
return prefix(getTablesDirs(), MetadataTable.ID);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
index 00e86d3..c2c04e5 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
@@ -117,9 +117,6 @@ public interface VolumeManager {
// all volume are ready to provide service (not in SafeMode, for example)
boolean isReady() throws IOException;
- // ambiguous references to files go here
- FileSystem getDefaultVolume();
-
// forward to the appropriate FileSystem object
FileStatus[] globStatus(Path path) throws IOException;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
index 034bc92..b577891 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@ -61,13 +61,13 @@ public class VolumeManagerImpl implements VolumeManager {
private static final Logger log = Logger.getLogger(VolumeManagerImpl.class);
Map<String,? extends FileSystem> volumes;
- String defaultVolume;
+ FileSystem defaultVolume;
AccumuloConfiguration conf;
VolumeChooser chooser;
protected VolumeManagerImpl(Map<String,? extends FileSystem> volumes, String defaultVolume, AccumuloConfiguration conf) {
this.volumes = volumes;
- this.defaultVolume = defaultVolume;
+ this.defaultVolume = volumes.get(defaultVolume);
this.conf = conf;
ensureSyncIsEnabled();
chooser = Property.createInstanceFromPropertyName(conf, Property.GENERAL_VOLUME_CHOOSER, VolumeChooser.class, new RandomVolumeChooser());
@@ -284,7 +284,7 @@ public class VolumeManagerImpl implements VolumeManager {
}
}
- return volumes.get(defaultVolume);
+ return defaultVolume;
}
@Override
@@ -371,7 +371,7 @@ public class VolumeManagerImpl implements VolumeManager {
if (space.contains(":")) {
fileSystems.put(space, new Path(space).getFileSystem(hadoopConf));
} else {
- fileSystems.put(space, FileSystem.get(hadoopConf));
+ throw new IllegalArgumentException("Expected fully qualified URI for " + Property.INSTANCE_VOLUMES.getKey() + " got " + space);
}
}
}
@@ -423,11 +423,6 @@ public class VolumeManagerImpl implements VolumeManager {
}
@Override
- public FileSystem getDefaultVolume() {
- return volumes.get(defaultVolume);
- }
-
- @Override
public FileStatus[] globStatus(Path pathPattern) throws IOException {
return getFileSystemByPath(pathPattern).globStatus(pathPattern);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index ea2c57a..88f4dac 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -20,6 +20,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Locale;
import java.util.Map.Entry;
import java.util.UUID;
@@ -54,6 +55,7 @@ import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
@@ -148,7 +150,7 @@ public class Initialize {
else
fsUri = FileSystem.getDefaultUri(conf).toString();
log.info("Hadoop Filesystem is " + fsUri);
- log.info("Accumulo data dirs are " + Arrays.asList(ServerConstants.getBaseDirs()));
+ log.info("Accumulo data dirs are " + Arrays.asList(ServerConstants.getConfiguredBaseDirs()));
log.info("Zookeeper server is " + sconf.get(Property.INSTANCE_ZK_HOST));
log.info("Checking if Zookeeper is available. If this hangs, then you need to make sure zookeeper is running");
if (!zookeeperAvailable()) {
@@ -170,16 +172,21 @@ public class Initialize {
try {
if (isInitialized(fs)) {
String instanceDfsDir = sconf.get(Property.INSTANCE_DFS_DIR);
- log.fatal("It appears the directory " + fsUri + instanceDfsDir + " was previously initialized.");
+ log.fatal("It appears the directories " + Arrays.asList(ServerConstants.getConfiguredBaseDirs()) + " were previously initialized.");
+ String instanceVolumes = sconf.get(Property.INSTANCE_VOLUMES);
String instanceDfsUri = sconf.get(Property.INSTANCE_DFS_URI);
- if ("".equals(instanceDfsUri)) {
- log.fatal("You are using the default URI for the filesystem. Set the property " + Property.INSTANCE_DFS_URI + " to use a different filesystem,");
- } else {
+
+ if (!instanceVolumes.isEmpty()) {
+ log.fatal("Change the property " + Property.INSTANCE_VOLUMES + " to use different filesystems,");
+ } else if (!instanceDfsDir.isEmpty()) {
log.fatal("Change the property " + Property.INSTANCE_DFS_URI + " to use a different filesystem,");
+ } else {
+ log.fatal("You are using the default URI for the filesystem. Set the property " + Property.INSTANCE_VOLUMES + " to use a different filesystem,");
}
log.fatal("or change the property " + Property.INSTANCE_DFS_DIR + " to use a different directory.");
log.fatal("The current value of " + Property.INSTANCE_DFS_URI + " is |" + instanceDfsUri + "|");
log.fatal("The current value of " + Property.INSTANCE_DFS_DIR + " is |" + instanceDfsDir + "|");
+ log.fatal("The current value of " + Property.INSTANCE_VOLUMES + " is |" + instanceVolumes + "|");
return false;
}
} catch (IOException e) {
@@ -211,7 +218,8 @@ public class Initialize {
UUID uuid = UUID.randomUUID();
// the actual disk locations of the root table and tablets
- final Path rootTablet = new Path(fs.choose(ServerConstants.getTablesDirs()) + "/" + RootTable.ID + RootTable.ROOT_TABLET_LOCATION);
+ String[] configuredTableDirs = ServerConstants.prefix(ServerConstants.getConfiguredBaseDirs(), ServerConstants.TABLE_DIR);
+ final Path rootTablet = new Path(fs.choose(configuredTableDirs) + "/" + RootTable.ID + RootTable.ROOT_TABLET_LOCATION);
try {
initZooKeeper(opts, uuid.toString(), instanceNamePath, rootTablet);
} catch (Exception e) {
@@ -269,23 +277,32 @@ public class Initialize {
return result;
}
+ private static void initDirs(VolumeManager fs, UUID uuid, String[] baseDirs, boolean print) throws IOException {
+ for (String baseDir : baseDirs) {
+ fs.mkdirs(new Path(new Path(baseDir, ServerConstants.VERSION_DIR), "" + ServerConstants.DATA_VERSION));
+
+ // create an instance id
+ Path iidLocation = new Path(baseDir, ServerConstants.INSTANCE_ID_DIR);
+ fs.mkdirs(iidLocation);
+ fs.createNewFile(new Path(iidLocation, uuid.toString()));
+ if (print)
+ log.info("Initialized volume " + baseDir);
+ }
+ }
+
// TODO Remove deprecation warning suppression when Hadoop1 support is dropped
@SuppressWarnings("deprecation")
private static void initFileSystem(Opts opts, VolumeManager fs, UUID uuid, Path rootTablet) throws IOException {
FileStatus fstat;
+ initDirs(fs, uuid, ServerConstants.getConfiguredBaseDirs(), false);
+
// the actual disk locations of the metadata table and tablets
final Path[] metadataTableDirs = paths(ServerConstants.getMetadataTableDirs());
String tableMetadataTabletDir = fs.choose(ServerConstants.prefix(ServerConstants.getMetadataTableDirs(), TABLE_TABLETS_TABLET_DIR));
String defaultMetadataTabletDir = fs.choose(ServerConstants.prefix(ServerConstants.getMetadataTableDirs(), Constants.DEFAULT_TABLET_LOCATION));
- fs.mkdirs(new Path(ServerConstants.getDataVersionLocation(), "" + ServerConstants.DATA_VERSION));
-
- // create an instance id
- fs.mkdirs(ServerConstants.getInstanceIdLocation());
- fs.createNewFile(new Path(ServerConstants.getInstanceIdLocation(), uuid.toString()));
-
// initialize initial metadata config in zookeeper
initMetadataConfig();
@@ -531,10 +548,38 @@ public class Initialize {
}
public static boolean isInitialized(VolumeManager fs) throws IOException {
- return (fs.exists(ServerConstants.getInstanceIdLocation()) || fs.exists(ServerConstants.getDataVersionLocation()));
+ for (String baseDir : ServerConstants.getConfiguredBaseDirs()) {
+ if (fs.exists(new Path(baseDir, ServerConstants.INSTANCE_ID_DIR)) || fs.exists(new Path(baseDir, ServerConstants.VERSION_DIR)))
+ return true;
+ }
+
+ return false;
+ }
+
+ private static void addVolumes(VolumeManager fs) throws IOException {
+ HashSet<String> initializedDirs = new HashSet<String>();
+ initializedDirs.addAll(Arrays.asList(ServerConstants.checkBaseDirs(ServerConstants.getConfiguredBaseDirs(), true)));
+
+ HashSet<String> uinitializedDirs = new HashSet<String>();
+ uinitializedDirs.addAll(Arrays.asList(ServerConstants.getConfiguredBaseDirs()));
+ uinitializedDirs.removeAll(initializedDirs);
+
+ Path aBasePath = new Path(initializedDirs.iterator().next());
+ Path iidPath = new Path(aBasePath, ServerConstants.INSTANCE_ID_DIR);
+ Path versionPath = new Path(aBasePath, ServerConstants.VERSION_DIR);
+
+ UUID uuid = UUID.fromString(ZooUtil.getInstanceIDFromHdfs(iidPath));
+
+ if (ServerConstants.DATA_VERSION != Accumulo.getAccumuloPersistentVersion(versionPath.getFileSystem(CachedConfiguration.getInstance()), versionPath)) {
+ throw new IOException("Accumulo " + Constants.VERSION + " cannot initialize data version " + Accumulo.getAccumuloPersistentVersion(fs));
+ }
+
+ initDirs(fs, uuid, uinitializedDirs.toArray(new String[uinitializedDirs.size()]), true);
}
static class Opts extends Help {
+ @Parameter(names = "--add-volumes", description = "Initialize any uninitialized volumes listed in instance.volumes")
+ boolean addVolumes = false;
@Parameter(names = "--reset-security", description = "just update the security information")
boolean resetSecurity = false;
@Parameter(names = "--clear-instance-name", description = "delete any existing instance name without prompting")
@@ -565,8 +610,15 @@ public class Initialize {
} else {
log.fatal("Attempted to reset security on accumulo before it was initialized");
}
- } else if (!doInit(opts, conf, fs))
- System.exit(-1);
+ }
+
+ if (opts.addVolumes) {
+ addVolumes(fs);
+ }
+
+ if (!opts.resetSecurity && !opts.addVolumes)
+ if (!doInit(opts, conf, fs))
+ System.exit(-1);
} catch (Exception e) {
log.fatal(e, e);
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/server/base/src/test/java/org/apache/accumulo/server/ServerConstantsTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/ServerConstantsTest.java b/server/base/src/test/java/org/apache/accumulo/server/ServerConstantsTest.java
new file mode 100644
index 0000000..a316155
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/ServerConstantsTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ *
+ */
+public class ServerConstantsTest {
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+ @Test
+ public void testCheckBaseDirs() throws IOException {
+ String uuid1 = UUID.randomUUID().toString();
+ String uuid2 = UUID.randomUUID().toString();
+
+ verifyAllPass(init(folder.newFolder(), Arrays.asList(uuid1), Arrays.asList(ServerConstants.DATA_VERSION)));
+ verifyAllPass(init(folder.newFolder(), Arrays.asList(uuid1, uuid1), Arrays.asList(ServerConstants.DATA_VERSION, ServerConstants.DATA_VERSION)));
+
+ verifyError(init(folder.newFolder(), Arrays.asList((String) null), Arrays.asList((Integer) null)));
+ verifyError(init(folder.newFolder(), Arrays.asList(uuid1, uuid2), Arrays.asList(ServerConstants.DATA_VERSION, ServerConstants.DATA_VERSION)));
+ verifyError(init(folder.newFolder(), Arrays.asList(uuid1, uuid1), Arrays.asList(ServerConstants.DATA_VERSION, ServerConstants.DATA_VERSION - 1)));
+ verifyError(init(folder.newFolder(), Arrays.asList(uuid1, uuid2), Arrays.asList(ServerConstants.DATA_VERSION, ServerConstants.DATA_VERSION - 1)));
+ verifyError(init(folder.newFolder(), Arrays.asList(uuid1, uuid2, null),
+ Arrays.asList(ServerConstants.DATA_VERSION, ServerConstants.DATA_VERSION, ServerConstants.DATA_VERSION)));
+
+ verifySomePass(
+ init(folder.newFolder(), Arrays.asList(uuid1, uuid1, null), Arrays.asList(ServerConstants.DATA_VERSION, ServerConstants.DATA_VERSION, null)), 2);
+ }
+
+ private void verifyAllPass(ArrayList<String> paths) {
+ Assert.assertEquals(paths, Arrays.asList(ServerConstants.checkBaseDirs(paths.toArray(new String[paths.size()]), true)));
+ Assert.assertEquals(paths, Arrays.asList(ServerConstants.checkBaseDirs(paths.toArray(new String[paths.size()]), false)));
+ }
+
+ private void verifySomePass(ArrayList<String> paths, int numExpected) {
+ Assert.assertEquals(paths.subList(0, 2), Arrays.asList(ServerConstants.checkBaseDirs(paths.toArray(new String[paths.size()]), true)));
+ try {
+ ServerConstants.checkBaseDirs(paths.toArray(new String[paths.size()]), false);
+ Assert.fail();
+ } catch (Exception e) {}
+ }
+
+ private void verifyError(ArrayList<String> paths) {
+ try {
+ ServerConstants.checkBaseDirs(paths.toArray(new String[paths.size()]), true);
+ Assert.fail();
+ } catch (Exception e) {}
+
+ try {
+ ServerConstants.checkBaseDirs(paths.toArray(new String[paths.size()]), false);
+ Assert.fail();
+ } catch (Exception e) {}
+ }
+
+ private ArrayList<String> init(File newFile, List<String> uuids, List<Integer> dataVersions) throws IllegalArgumentException, IOException {
+ String base = newFile.toURI().toString();
+
+ LocalFileSystem fs = FileSystem.getLocal(new Configuration());
+
+ ArrayList<String> accumuloPaths = new ArrayList<String>();
+
+ for (int i = 0; i < uuids.size(); i++) {
+ String volume = "v" + i;
+
+ String accumuloPath = base + "/" + volume + "/accumulo";
+ accumuloPaths.add(accumuloPath);
+
+ if (uuids.get(i) != null) {
+ fs.mkdirs(new Path(accumuloPath + "/" + ServerConstants.INSTANCE_ID_DIR));
+ fs.createNewFile(new Path(accumuloPath + "/" + ServerConstants.INSTANCE_ID_DIR + "/" + uuids.get(i)));
+ }
+
+ if (dataVersions.get(i) != null) {
+ fs.mkdirs(new Path(accumuloPath + "/" + ServerConstants.VERSION_DIR));
+ fs.createNewFile(new Path(accumuloPath + "/" + ServerConstants.VERSION_DIR + "/" + dataVersions.get(i)));
+ }
+ }
+
+ return accumuloPaths;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/server/base/src/test/java/org/apache/accumulo/server/init/InitializeTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/init/InitializeTest.java b/server/base/src/test/java/org/apache/accumulo/server/init/InitializeTest.java
index d308d06..251d859 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/init/InitializeTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/init/InitializeTest.java
@@ -94,6 +94,7 @@ public class InitializeTest {
expect(sconf.get(Property.INSTANCE_DFS_URI)).andReturn("hdfs://foo");
expectLastCall().anyTimes();
expect(sconf.get(Property.INSTANCE_DFS_DIR)).andReturn("/bar");
+ expect(sconf.get(Property.INSTANCE_VOLUMES)).andReturn("");
expect(sconf.get(Property.INSTANCE_ZK_HOST)).andReturn("zk1");
expect(sconf.get(Property.INSTANCE_SECRET)).andReturn(Property.INSTANCE_SECRET.getDefaultValue());
replay(sconf);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java b/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java
index f29fb27..c8610d5 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java
@@ -26,6 +26,7 @@ import java.util.UUID;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.impl.ConnectorImpl;
import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.security.SystemCredentials.SystemToken;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -37,12 +38,18 @@ public class SystemCredentialsTest {
@BeforeClass
public static void setUp() throws IOException {
- File testInstanceId = new File(new File(new File(new File("target"), "instanceTest"), "instance_id"), UUID.fromString(
+ File testInstanceId = new File(new File(new File(new File("target"), "instanceTest"), ServerConstants.INSTANCE_ID_DIR), UUID.fromString(
"00000000-0000-0000-0000-000000000000").toString());
if (!testInstanceId.exists()) {
testInstanceId.getParentFile().mkdirs();
testInstanceId.createNewFile();
}
+
+ File testInstanceVersion = new File(new File(new File(new File("target"), "instanceTest"), ServerConstants.VERSION_DIR), ServerConstants.DATA_VERSION + "");
+ if (!testInstanceVersion.exists()) {
+ testInstanceVersion.getParentFile().mkdirs();
+ testInstanceVersion.createNewFile();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java
index 57f16b4..590945a 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java
@@ -211,11 +211,6 @@ public class TabletServerSyncCheckTest {
}
@Override
- public FileSystem getDefaultVolume() {
- return null;
- }
-
- @Override
public FileStatus[] globStatus(Path path) throws IOException {
return null;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java
index 53fb27c..c4d3dfb 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java
@@ -48,7 +48,7 @@ public class MultiReaderTest {
Path root = new Path("file://" + path + "/manyMaps");
fs.mkdirs(root);
fs.create(new Path(root, "finished")).close();
- FileSystem ns = fs.getDefaultVolume();
+ FileSystem ns = fs.getFileSystemByPath(root);
@SuppressWarnings("deprecation")
Writer oddWriter = new Writer(ns.getConf(), ns, new Path(root, "odd").toString(), IntWritable.class, BytesWritable.class);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/11d803cf/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
index 2201ad2..2f64d58 100644
--- a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
@@ -19,19 +19,28 @@ package org.apache.accumulo.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.io.BufferedOutputStream;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
import java.util.SortedSet;
import java.util.TreeSet;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.admin.DiskUsage;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
@@ -44,9 +53,16 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.init.Initialize;
+import org.apache.accumulo.server.util.Admin;
import org.apache.accumulo.test.functional.ConfigurableMacIT;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.junit.After;
@@ -83,6 +99,7 @@ public class VolumeIT extends ConfigurableMacIT {
public void configure(MiniAccumuloConfigImpl cfg) {
// Run MAC on two locations in the local file system
cfg.setProperty(Property.INSTANCE_DFS_URI, v1.toString());
+ cfg.setProperty(Property.INSTANCE_DFS_DIR, "/accumulo");
cfg.setProperty(Property.INSTANCE_VOLUMES, v1.toString() + "," + v2.toString());
super.configure(cfg);
}
@@ -233,4 +250,110 @@ public class VolumeIT extends ConfigurableMacIT {
}
}
+
+
+ @Test
+ public void testAddVolumes() throws Exception {
+
+ String[] tableNames = getTableNames(2);
+
+ // grab this before shutting down cluster
+ String uuid = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers()).getInstanceID();
+
+ verifyVolumesUsed(tableNames[0], v1, v2);
+
+ Assert.assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+ cluster.stop();
+
+ Configuration conf = new Configuration(false);
+ conf.addResource(new Path(cluster.getConfig().getConfDir().toURI().toString(), "accumulo-site.xml"));
+
+ File v3f = new File(volDirBase, "v3");
+ v3f.mkdir();
+ Path v3 = new Path("file://" + v3f.getAbsolutePath());
+
+ conf.set(Property.INSTANCE_VOLUMES.getKey(), v1.toString() + "," + v2.toString()+","+v3.toString());
+ BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "accumulo-site.xml")));
+ conf.writeXml(fos);
+ fos.close();
+
+ // initialize volume
+ Assert.assertEquals(0, cluster.exec(Initialize.class, "--add-volumes").waitFor());
+
+ // check that all volumes are initialized
+ for (Path volumePath : Arrays.asList(v1, v2, v3)) {
+ FileSystem fs = volumePath.getFileSystem(CachedConfiguration.getInstance());
+ Path vp = new Path(volumePath, "accumulo");
+ Path vpi = new Path(vp, ServerConstants.INSTANCE_ID_DIR);
+ FileStatus[] iids = fs.listStatus(vpi);
+ Assert.assertEquals(1, iids.length);
+ Assert.assertEquals(uuid, iids[0].getPath().getName());
+ }
+
+ // start cluster and verify that new volume is used
+ cluster.start();
+
+ verifyVolumesUsed(tableNames[1], v1, v2, v3);
+
+ }
+
+ private void verifyVolumesUsed(String tableName, Path... paths) throws AccumuloException, AccumuloSecurityException, TableExistsException,
+ TableNotFoundException,
+ MutationsRejectedException {
+ TreeSet<Text> splits = new TreeSet<Text>();
+ for (int i = 0; i < 100; i++) {
+ splits.add(new Text(String.format("%06d", i * 100)));
+ }
+
+ Connector conn = cluster.getConnector("root", ROOT_PASSWORD);
+ conn.tableOperations().create(tableName);
+ conn.tableOperations().addSplits(tableName, splits);
+
+ List<String> expected = new ArrayList<String>();
+
+ BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ for (int i = 0; i < 100; i++) {
+ String row = String.format("%06d", i * 100 + 3);
+ Mutation m = new Mutation(row);
+ m.put("cf1", "cq1", "1");
+ bw.addMutation(m);
+ expected.add(row + ":cf1:cq1:1");
+ }
+
+ bw.close();
+
+ verifyData(expected, conn.createScanner(tableName, Authorizations.EMPTY));
+
+ conn.tableOperations().flush(tableName, null, null, true);
+
+ verifyData(expected, conn.createScanner(tableName, Authorizations.EMPTY));
+
+ String tableId = conn.tableOperations().tableIdMap().get(tableName);
+ Scanner metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ metaScanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+ metaScanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
+
+ int counts[] = new int[paths.length];
+
+ for (Entry<Key,Value> entry : metaScanner) {
+ String cq = entry.getKey().getColumnQualifier().toString();
+ for (int i = 0; i < paths.length; i++) {
+ if (cq.startsWith(paths[i].toString())) {
+ counts[i]++;
+ }
+ }
+ }
+
+ // if a volume is chosen randomly for each tablet, then the probability that a volume will not be chosen for any tablet is ((num_volumes -
+ // 1)/num_volumes)^num_tablets. For 100 tablets and 3 volumes the probability that only 2 volumes would be chosen is 2.46e-18
+
+ int sum = 0;
+ for (int count : counts) {
+ Assert.assertTrue(count > 0);
+ sum += count;
+ }
+
+ Assert.assertEquals(100, sum);
+ }
+
}