You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/12/02 23:54:25 UTC
[1/4] git commit: ACCUMULO-1947 Remove forcefully setting the logger
in a test with no reset after the test is done.
Updated Branches:
refs/heads/master c7062890d -> ccec95b69
ACCUMULO-1947 Remove forcefully setting the logger in a test with no reset after the test is done.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c7fc7765
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c7fc7765
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c7fc7765
Branch: refs/heads/master
Commit: c7fc776562275e417b0497ba47a2a5a212eca04f
Parents: 42e8df7
Author: Josh Elser <el...@apache.org>
Authored: Mon Dec 2 16:59:51 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Mon Dec 2 16:59:51 2013 -0500
----------------------------------------------------------------------
.../apache/accumulo/server/tabletserver/log/MultiReaderTest.java | 2 --
1 file changed, 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c7fc7765/server/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java b/server/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java
index 93583bf..ee43f69 100644
--- a/server/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java
+++ b/server/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java
@@ -41,8 +41,6 @@ public class MultiReaderTest {
@Before
public void setUp() throws Exception {
- // quiet log messages about compress.CodecPool
- Logger.getRootLogger().setLevel(Level.ERROR);
fs = FileSystem.getLocal(conf);
Path root = new Path("manyMaps");
fs.mkdirs(root);
[2/4] git commit: ACCUMULO-1947 Use
DfsConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY instead of the CreateFlag class as
a better way to check whether or not to we warn if the configuration doesn't
contain dfs.datanode.synconclose=true
Posted by el...@apache.org.
ACCUMULO-1947 Use DfsConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY instead of the CreateFlag class as a better way to check
whether or not to we warn if the configuration doesn't contain dfs.datanode.synconclose=true
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/cd96f85e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/cd96f85e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/cd96f85e
Branch: refs/heads/master
Commit: cd96f85e0ec24be01d38c3a3dc78380f3b945d0b
Parents: c7fc776
Author: Josh Elser <el...@apache.org>
Authored: Mon Dec 2 17:13:44 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Mon Dec 2 17:13:44 2013 -0500
----------------------------------------------------------------------
.../accumulo/server/tabletserver/TabletServer.java | 16 +++++++++++-----
1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cd96f85e/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index b4d4780..aa52834 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -3285,14 +3285,20 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
try {
- // if this class exists
- Class.forName("org.apache.hadoop.fs.CreateFlag");
- // we're running hadoop 2.0, 1.1
+ // Check DFSConfigKeys to see if DFS_DATANODE_SYNCONCLOSE_KEY exists (should be everything >=1.1.1 and the 0.23 line)
+ Class<?> dfsConfigKeysClz = Class.forName("org.apache.hadoop.hdfs.DFSConfigKeys");
+ dfsConfigKeysClz.getDeclaredField("DFS_DATANODE_SYNCONCLOSE_KEY");
+
+ // Everything else
if (!fs.getConf().getBoolean("dfs.datanode.synconclose", false)) {
- log.warn("dfs.datanode.synconclose set to false: data loss is possible on system reset or power loss");
+ log.warn("dfs.datanode.synconclose set to false in hdfs-site.xml: data loss is possible on system reset or power loss");
}
} catch (ClassNotFoundException ex) {
- // hadoop 1.0
+ // hadoop 1.0.X or hadoop 1.1.0
+ } catch (SecurityException e) {
+ // hadoop 1.0.X or hadoop 1.1.0
+ } catch (NoSuchFieldException e) {
+ // hadoop 1.0.X or hadoop 1.1.0
}
}
[3/4] git commit: Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Posted by el...@apache.org.
Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Conflicts:
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/890ee25c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/890ee25c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/890ee25c
Branch: refs/heads/master
Commit: 890ee25cf3b9c7d1fd256a3b3237e7c3030e4183
Parents: 360f0cf cd96f85
Author: Josh Elser <el...@apache.org>
Authored: Mon Dec 2 17:53:54 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Mon Dec 2 17:53:54 2013 -0500
----------------------------------------------------------------------
.../accumulo/server/fs/VolumeManagerImpl.java | 17 +++++++++++------
.../accumulo/tserver/log/MultiReaderTest.java | 2 --
2 files changed, 11 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/890ee25c/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
index 39afe75,0000000..472b0c0
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@@ -1,474 -1,0 +1,479 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+public class VolumeManagerImpl implements VolumeManager {
+
+ private static final Logger log = Logger.getLogger(VolumeManagerImpl.class);
+
+ Map<String,? extends FileSystem> volumes;
+ String defaultVolume;
+ AccumuloConfiguration conf;
+ VolumeChooser chooser;
+
+ protected VolumeManagerImpl(Map<String,? extends FileSystem> volumes, String defaultVolume, AccumuloConfiguration conf) {
+ this.volumes = volumes;
+ this.defaultVolume = defaultVolume;
+ this.conf = conf;
+ ensureSyncIsEnabled();
+ chooser = Property.createInstanceFromPropertyName(conf, Property.GENERAL_VOLUME_CHOOSER, VolumeChooser.class, new RandomVolumeChooser());
+ }
+
+ public static org.apache.accumulo.server.fs.VolumeManager getLocal() throws IOException {
+ return new VolumeManagerImpl(Collections.singletonMap("", FileSystem.getLocal(CachedConfiguration.getInstance())), "",
+ DefaultConfiguration.getDefaultConfiguration());
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOException ex = null;
+ for (FileSystem fs : volumes.values()) {
+ try {
+ fs.close();
+ } catch (IOException e) {
+ ex = e;
+ }
+ }
+ if (ex != null) {
+ throw ex;
+ }
+ }
+
+ @Override
+ public boolean closePossiblyOpenFile(Path path) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ try {
+ return dfs.recoverLease(path);
+ } catch (FileNotFoundException ex) {
+ throw ex;
+ }
+ } else if (fs instanceof LocalFileSystem) {
+ // ignore
+ } else {
+ throw new IllegalStateException("Don't know how to recover a lease for " + fs.getClass().getName());
+ }
+ fs.append(path).close();
+ log.info("Recovered lease on " + path.toString() + " using append");
+ return true;
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ return fs.create(path);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path, boolean overwrite) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ return fs.create(path, overwrite);
+ }
+
+ private static long correctBlockSize(Configuration conf, long blockSize) {
+ if (blockSize <= 0)
+ blockSize = conf.getLong("dfs.block.size", 67108864);
+
+ int checkSum = conf.getInt("io.bytes.per.checksum", 512);
+ blockSize -= blockSize % checkSum;
+ blockSize = Math.max(blockSize, checkSum);
+ return blockSize;
+ }
+
+ private static int correctBufferSize(Configuration conf, int bufferSize) {
+ if (bufferSize <= 0)
+ bufferSize = conf.getInt("io.file.buffer.size", 4096);
+ return bufferSize;
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ if (bufferSize == 0) {
+ fs.getConf().getInt("io.file.buffer.size", 4096);
+ }
+ return fs.create(path, overwrite, bufferSize, replication, correctBlockSize(fs.getConf(), blockSize));
+ }
+
+ @Override
+ public boolean createNewFile(Path path) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ return fs.createNewFile(path);
+ }
+
+ @Override
+ public FSDataOutputStream createSyncable(Path logPath, int bufferSize, short replication, long blockSize) throws IOException {
+ FileSystem fs = getFileSystemByPath(logPath);
+ blockSize = correctBlockSize(fs.getConf(), blockSize);
+ bufferSize = correctBufferSize(fs.getConf(), bufferSize);
+ try {
+ // This...
+ // EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK, CreateFlag.CREATE);
+ // return fs.create(logPath, FsPermission.getDefault(), set, buffersize, replication, blockSize, null);
+ // Becomes this:
+ Class<?> createFlags = Class.forName("org.apache.hadoop.fs.CreateFlag");
+ List<Enum<?>> flags = new ArrayList<Enum<?>>();
+ if (createFlags.isEnum()) {
+ for (Object constant : createFlags.getEnumConstants()) {
+ if (constant.toString().equals("SYNC_BLOCK")) {
+ flags.add((Enum<?>) constant);
+ log.debug("Found synch enum " + constant);
+ }
+ if (constant.toString().equals("CREATE")) {
+ flags.add((Enum<?>) constant);
+ log.debug("Found CREATE enum " + constant);
+ }
+ }
+ }
+ Object set = EnumSet.class.getMethod("of", java.lang.Enum.class, java.lang.Enum.class).invoke(null, flags.get(0), flags.get(1));
+ log.debug("CreateFlag set: " + set);
+ Method create = fs.getClass().getMethod("create", Path.class, FsPermission.class, EnumSet.class, Integer.TYPE, Short.TYPE, Long.TYPE, Progressable.class);
+ log.debug("creating " + logPath + " with SYNCH_BLOCK flag");
+ return (FSDataOutputStream) create.invoke(fs, logPath, FsPermission.getDefault(), set, bufferSize, replication, blockSize, null);
+ } catch (ClassNotFoundException ex) {
+ // Expected in hadoop 1.0
+ return fs.create(logPath, true, bufferSize, replication, blockSize);
+ } catch (Exception ex) {
+ log.debug(ex, ex);
+ return fs.create(logPath, true, bufferSize, replication, blockSize);
+ }
+ }
+
+ @Override
+ public boolean delete(Path path) throws IOException {
+ return getFileSystemByPath(path).delete(path, false);
+ }
+
+ @Override
+ public boolean deleteRecursively(Path path) throws IOException {
+ return getFileSystemByPath(path).delete(path, true);
+ }
+
+ protected void ensureSyncIsEnabled() {
+ for (Entry<String,? extends FileSystem> entry : getFileSystems().entrySet()) {
+ final String volumeName = entry.getKey();
+ final FileSystem fs = entry.getValue();
+
+ if (fs instanceof DistributedFileSystem) {
+ final String DFS_DURABLE_SYNC = "dfs.durable.sync", DFS_SUPPORT_APPEND = "dfs.support.append";
+ final String ticketMessage = "See ACCUMULO-623 and ACCUMULO-1637 for more details.";
+ // Check to make sure that we have proper defaults configured
+ try {
+ // If the default is off (0.20.205.x or 1.0.x)
+ DFSConfigKeys configKeys = new DFSConfigKeys();
+
+ // Can't use the final constant itself as Java will inline it at compile time
+ Field dfsSupportAppendDefaultField = configKeys.getClass().getField("DFS_SUPPORT_APPEND_DEFAULT");
+ boolean dfsSupportAppendDefaultValue = dfsSupportAppendDefaultField.getBoolean(configKeys);
+
+ if (!dfsSupportAppendDefaultValue) {
+ // See if the user did the correct override
+ if (!fs.getConf().getBoolean(DFS_SUPPORT_APPEND, false)) {
+ String msg = "Accumulo requires that dfs.support.append to true. " + ticketMessage;
+ log.fatal(msg);
+ throw new RuntimeException(msg);
+ }
+ }
+ } catch (NoSuchFieldException e) {
+ // If we can't find DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT, the user is running
+ // 1.1.x or 1.2.x. This is ok, though, as, by default, these versions have append/sync enabled.
+ } catch (Exception e) {
+ log.warn("Error while checking for " + DFS_SUPPORT_APPEND + " on volume " + volumeName + ". The user should ensure that Hadoop is configured to properly supports append and sync. " + ticketMessage, e);
+ }
+
+ // If either of these parameters are configured to be false, fail.
+ // This is a sign that someone is writing bad configuration.
+ if (!fs.getConf().getBoolean(DFS_SUPPORT_APPEND, true) || !fs.getConf().getBoolean(DFS_DURABLE_SYNC, true)) {
+ String msg = "Accumulo requires that " + DFS_SUPPORT_APPEND + " and " + DFS_DURABLE_SYNC + " not be configured as false. " + ticketMessage;
+ log.fatal(msg);
+ throw new RuntimeException(msg);
+ }
+
+ try {
- // if this class exists
- Class.forName("org.apache.hadoop.fs.CreateFlag");
- // we're running hadoop 2.0, 1.1
++ // Check DFSConfigKeys to see if DFS_DATANODE_SYNCONCLOSE_KEY exists (should be everything >=1.1.1 and the 0.23 line)
++ Class<?> dfsConfigKeysClz = Class.forName("org.apache.hadoop.hdfs.DFSConfigKeys");
++ dfsConfigKeysClz.getDeclaredField("DFS_DATANODE_SYNCONCLOSE_KEY");
++
++ // Everything else
+ if (!fs.getConf().getBoolean("dfs.datanode.synconclose", false)) {
- log.warn("dfs.datanode.synconclose set to false: data loss is possible on system reset or power loss on volume " + volumeName);
++ log.warn("dfs.datanode.synconclose set to false in hdfs-site.xml: data loss is possible on system reset or power loss");
+ }
+ } catch (ClassNotFoundException ex) {
- // hadoop 1.0
++ // hadoop 1.0.X or hadoop 1.1.0
++ } catch (SecurityException e) {
++ // hadoop 1.0.X or hadoop 1.1.0
++ } catch (NoSuchFieldException e) {
++ // hadoop 1.0.X or hadoop 1.1.0
+ }
+ }
+ }
-
+ }
+
+ @Override
+ public boolean exists(Path path) throws IOException {
+ return getFileSystemByPath(path).exists(path);
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ return getFileSystemByPath(path).getFileStatus(path);
+ }
+
+ @Override
+ public FileSystem getFileSystemByPath(Path path) {
+ if (path.toString().contains(":")) {
+ try {
+ return path.getFileSystem(CachedConfiguration.getInstance());
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ return volumes.get(defaultVolume);
+ }
+
+ @Override
+ public Map<String,? extends FileSystem> getFileSystems() {
+ return volumes;
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ return getFileSystemByPath(path).listStatus(path);
+ }
+
+ @Override
+ public boolean mkdirs(Path path) throws IOException {
+ return getFileSystemByPath(path).mkdirs(path);
+ }
+
+ @Override
+ public FSDataInputStream open(Path path) throws IOException {
+ return getFileSystemByPath(path).open(path);
+ }
+
+ @Override
+ public boolean rename(Path path, Path newPath) throws IOException {
+ FileSystem source = getFileSystemByPath(path);
+ FileSystem dest = getFileSystemByPath(newPath);
+ if (source != dest) {
+ throw new NotImplementedException("Cannot rename files across volumes: " + path + " -> " + newPath);
+ }
+ return source.rename(path, newPath);
+ }
+
+ @Override
+ public boolean moveToTrash(Path path) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ Trash trash = new Trash(fs, fs.getConf());
+ return trash.moveToTrash(path);
+ }
+
+ @Override
+ public short getDefaultReplication(Path path) {
+ @SuppressWarnings("deprecation")
+ short rep = getFileSystemByPath(path).getDefaultReplication();
+ return rep;
+ }
+
+ @Override
+ public boolean isFile(Path path) throws IOException {
+ return getFileSystemByPath(path).isFile(path);
+ }
+
+ public static VolumeManager get() throws IOException {
+ AccumuloConfiguration conf = ServerConfiguration.getSystemConfiguration(HdfsZooInstance.getInstance());
+ return get(conf);
+ }
+
+ static private final String DEFAULT = "";
+
+ public static VolumeManager get(AccumuloConfiguration conf) throws IOException {
+ Map<String,FileSystem> fileSystems = new HashMap<String,FileSystem>();
+ Configuration hadoopConf = CachedConfiguration.getInstance();
+ fileSystems.put(DEFAULT, FileUtil.getFileSystem(hadoopConf, conf));
+ String ns = conf.get(Property.INSTANCE_VOLUMES);
+ if (ns != null && !ns.isEmpty()) {
+ for (String space : ns.split(",")) {
+ if (space.equals(DEFAULT))
+ throw new IllegalArgumentException();
+
+ if (space.contains(":")) {
+ fileSystems.put(space, new Path(space).getFileSystem(hadoopConf));
+ } else {
+ fileSystems.put(space, FileSystem.get(hadoopConf));
+ }
+ }
+ }
+ return new VolumeManagerImpl(fileSystems, DEFAULT, conf);
+ }
+
+ @Override
+ public boolean isReady() throws IOException {
+ for (FileSystem fs : getFileSystems().values()) {
+ if (!(fs instanceof DistributedFileSystem))
+ continue;
+ DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET))
+ // Becomes this:
+ Class<?> safeModeAction;
+ try {
+ // hadoop 2.0
+ safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction");
+ } catch (ClassNotFoundException ex) {
+ // hadoop 1.0
+ try {
+ safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Cannot figure out the right class for Constants");
+ }
+ }
+ Object get = null;
+ for (Object obj : safeModeAction.getEnumConstants()) {
+ if (obj.toString().equals("SAFEMODE_GET"))
+ get = obj;
+ }
+ if (get == null) {
+ throw new RuntimeException("cannot find SAFEMODE_GET");
+ }
+ try {
+ Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction);
+ boolean inSafeMode = (Boolean) setSafeMode.invoke(dfs, get);
+ if (inSafeMode) {
+ return false;
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException("cannot find method setSafeMode");
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public FileSystem getDefaultVolume() {
+ return volumes.get(defaultVolume);
+ }
+
+ @Override
+ public FileStatus[] globStatus(Path pathPattern) throws IOException {
+ return getFileSystemByPath(pathPattern).globStatus(pathPattern);
+ }
+
+ @Override
+ public Path getFullPath(Key key) {
+ // TODO sanity check col fam
+ String relPath = key.getColumnQualifierData().toString();
+ byte[] tableId = KeyExtent.tableOfMetadataRow(key.getRow());
+ return getFullPath(new String(tableId), relPath);
+ }
+
+ @Override
+ public Path matchingFileSystem(Path source, String[] options) {
+ URI uri1 = source.toUri();
+ for (String option : options) {
+ URI uri3 = URI.create(option);
+ if (uri1.getScheme().equals(uri3.getScheme())) {
+ String a1 = uri1.getAuthority();
+ String a2 = uri3.getAuthority();
+ if (a1 == a2 || (a1 != null && a1.equals(a2)))
+ return new Path(option);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Path getFullPath(String tableId, String path) {
+ if (path.contains(":"))
+ return new Path(path);
+
+ if (path.startsWith("../"))
+ path = path.substring(2);
+ else if (path.startsWith("/"))
+ path = "/" + tableId + path;
+ else
+ throw new IllegalArgumentException("Unexpected path prefix " + path);
+
+ return getFullPath(FileType.TABLE, path);
+ }
+
+ @Override
+ public Path getFullPath(FileType fileType, String path) {
+ if (path.contains(":"))
+ return new Path(path);
+
+ // normalize the path
+ Path fullPath = new Path(ServerConstants.getDefaultBaseDir(), fileType.getDirectory());
+ if (path.startsWith("/"))
+ path = path.substring(1);
+ fullPath = new Path(fullPath, path);
+
+ FileSystem fs = getFileSystemByPath(fullPath);
+ return fs.makeQualified(fullPath);
+ }
+
+ @Override
+ public ContentSummary getContentSummary(Path dir) throws IOException {
+ return getFileSystemByPath(dir).getContentSummary(dir);
+ }
+
+ @Override
+ public String choose(String[] options) {
+ return chooser.choose(options);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/890ee25c/server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java
----------------------------------------------------------------------
diff --cc server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java
index bb96295,0000000..168842e
mode 100644,000000..100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java
@@@ -1,151 -1,0 +1,149 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapFile.Writer;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class MultiReaderTest {
+
+ VolumeManager fs;
+ TemporaryFolder root = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+ @Before
+ public void setUp() throws Exception {
- // quiet log messages about compress.CodecPool
- Logger.getRootLogger().setLevel(Level.ERROR);
+ fs = VolumeManagerImpl.getLocal();
+ root.create();
+ String path = root.getRoot().getAbsolutePath();
+ Path root = new Path("file://" + path + "/manyMaps");
+ fs.mkdirs(root);
+ fs.create(new Path(root, "finished")).close();
+ FileSystem ns = fs.getDefaultVolume();
+
+ @SuppressWarnings("deprecation")
+ Writer oddWriter = new Writer(ns.getConf(), ns, new Path(root, "odd").toString(), IntWritable.class, BytesWritable.class);
+ BytesWritable value = new BytesWritable("someValue".getBytes());
+ for (int i = 1; i < 1000; i += 2) {
+ oddWriter.append(new IntWritable(i), value);
+ }
+ oddWriter.close();
+
+ @SuppressWarnings("deprecation")
+ Writer evenWriter = new Writer(ns.getConf(), ns, new Path(root, "even").toString(), IntWritable.class, BytesWritable.class);
+ for (int i = 0; i < 1000; i += 2) {
+ if (i == 10)
+ continue;
+ evenWriter.append(new IntWritable(i), value);
+ }
+ evenWriter.close();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ root.create();
+ }
+
+ private void scan(MultiReader reader, int start) throws IOException {
+ IntWritable key = new IntWritable();
+ BytesWritable value = new BytesWritable();
+
+ for (int i = start + 1; i < 1000; i++) {
+ if (i == 10)
+ continue;
+ assertTrue(reader.next(key, value));
+ assertEquals(i, key.get());
+ }
+ }
+
+ private void scanOdd(MultiReader reader, int start) throws IOException {
+ IntWritable key = new IntWritable();
+ BytesWritable value = new BytesWritable();
+
+ for (int i = start + 2; i < 1000; i += 2) {
+ assertTrue(reader.next(key, value));
+ assertEquals(i, key.get());
+ }
+ }
+
+ @Test
+ public void testMultiReader() throws IOException {
+ Path manyMaps = new Path("file://" + root.getRoot().getAbsolutePath() + "/manyMaps");
+ MultiReader reader = new MultiReader(fs, manyMaps);
+ IntWritable key = new IntWritable();
+ BytesWritable value = new BytesWritable();
+
+ for (int i = 0; i < 1000; i++) {
+ if (i == 10)
+ continue;
+ assertTrue(reader.next(key, value));
+ assertEquals(i, key.get());
+ }
+ assertEquals(value.compareTo(new BytesWritable("someValue".getBytes())), 0);
+ assertFalse(reader.next(key, value));
+
+ key.set(500);
+ assertTrue(reader.seek(key));
+ scan(reader, 500);
+ key.set(10);
+ assertFalse(reader.seek(key));
+ scan(reader, 10);
+ key.set(1000);
+ assertFalse(reader.seek(key));
+ assertFalse(reader.next(key, value));
+ key.set(-1);
+ assertFalse(reader.seek(key));
+ key.set(0);
+ assertTrue(reader.next(key, value));
+ assertEquals(0, key.get());
+ reader.close();
+
+ fs.deleteRecursively(new Path(manyMaps, "even"));
+ reader = new MultiReader(fs, manyMaps);
+ key.set(501);
+ assertTrue(reader.seek(key));
+ scanOdd(reader, 501);
+ key.set(1000);
+ assertFalse(reader.seek(key));
+ assertFalse(reader.next(key, value));
+ key.set(-1);
+ assertFalse(reader.seek(key));
+ key.set(1);
+ assertTrue(reader.next(key, value));
+ assertEquals(1, key.get());
+ reader.close();
+
+ }
+
+}
[4/4] git commit: Merge branch '1.6.0-SNAPSHOT'
Posted by el...@apache.org.
Merge branch '1.6.0-SNAPSHOT'
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ccec95b6
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ccec95b6
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ccec95b6
Branch: refs/heads/master
Commit: ccec95b69cc40bb83627b24ec795b4898fe030ff
Parents: c706289 890ee25
Author: Josh Elser <el...@apache.org>
Authored: Mon Dec 2 17:54:20 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Mon Dec 2 17:54:20 2013 -0500
----------------------------------------------------------------------
.../accumulo/server/fs/VolumeManagerImpl.java | 17 +++++++++++------
.../accumulo/tserver/log/MultiReaderTest.java | 2 --
2 files changed, 11 insertions(+), 8 deletions(-)
----------------------------------------------------------------------