You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2013/10/30 23:22:15 UTC
svn commit: r1537330 [3/7] - in
/hadoop/common/branches/YARN-321/hadoop-common-project: ./
hadoop-annotations/ hadoop-auth/ hadoop-common/ hadoop-common/dev-support/
hadoop-common/src/ hadoop-common/src/main/bin/ hadoop-common/src/main/conf/
hadoop-com...
Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1531125
Merged /hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java:r1519784-1537326
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java Wed Oct 30 22:21:59 2013
@@ -258,5 +258,9 @@ public class CommonConfigurationKeysPubl
public static final String HADOOP_SSL_ENABLED_KEY = "hadoop.ssl.enabled";
public static final boolean HADOOP_SSL_ENABLED_DEFAULT = false;
+
+ // HTTP policies to be used in configuration
+ public static final String HTTP_POLICY_HTTP_ONLY = "HTTP_ONLY";
+ public static final String HTTP_POLICY_HTTPS_ONLY = "HTTPS_ONLY";
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java Wed Oct 30 22:21:59 2013
@@ -1,4 +1,5 @@
/**
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,17 +20,29 @@ package org.apache.hadoop.fs;
import java.io.*;
import java.nio.ByteBuffer;
+import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.fs.ByteBufferUtil;
+import org.apache.hadoop.util.IdentityHashStore;
/** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
* and buffers input through a {@link BufferedInputStream}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FSDataInputStream extends DataInputStream
- implements Seekable, PositionedReadable, Closeable,
- ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead {
+ implements Seekable, PositionedReadable, Closeable,
+ ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
+ HasEnhancedByteBufferAccess {
+ /**
+ * Map ByteBuffers that we have handed out to readers to ByteBufferPool
+ * objects
+ */
+ private final IdentityHashStore<ByteBuffer, ByteBufferPool>
+ extendedReadBuffers
+ = new IdentityHashStore<ByteBuffer, ByteBufferPool>(0);
public FSDataInputStream(InputStream in)
throws IOException {
@@ -167,4 +180,45 @@ public class FSDataInputStream extends D
"support setting the drop-behind caching setting.");
}
}
+
+ @Override
+ public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
+ EnumSet<ReadOption> opts)
+ throws IOException, UnsupportedOperationException {
+ try {
+ return ((HasEnhancedByteBufferAccess)in).read(bufferPool,
+ maxLength, opts);
+ }
+ catch (ClassCastException e) {
+ ByteBuffer buffer = ByteBufferUtil.
+ fallbackRead(this, bufferPool, maxLength);
+ if (buffer != null) {
+ extendedReadBuffers.put(buffer, bufferPool);
+ }
+ return buffer;
+ }
+ }
+
+ private static final EnumSet<ReadOption> EMPTY_READ_OPTIONS_SET =
+ EnumSet.noneOf(ReadOption.class);
+
+ final public ByteBuffer read(ByteBufferPool bufferPool, int maxLength)
+ throws IOException, UnsupportedOperationException {
+ return read(bufferPool, maxLength, EMPTY_READ_OPTIONS_SET);
+ }
+
+ @Override
+ public void releaseBuffer(ByteBuffer buffer) {
+ try {
+ ((HasEnhancedByteBufferAccess)in).releaseBuffer(buffer);
+ }
+ catch (ClassCastException e) {
+ ByteBufferPool bufferPool = extendedReadBuffers.remove( buffer);
+ if (bufferPool == null) {
+ throw new IllegalArgumentException("tried to release a buffer " +
+ "that was not created by this stream.");
+ }
+ bufferPool.putBuffer(buffer);
+ }
+ }
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java Wed Oct 30 22:21:59 2013
@@ -18,9 +18,11 @@
package org.apache.hadoop.fs;
import java.io.*;
+import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.ZeroCopyUnavailableException;
/****************************************************************
* FSInputStream is a generic old InputStream with a little bit
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java Wed Oct 30 22:21:59 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.lang.ref.WeakReference;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
@@ -31,6 +32,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -2502,28 +2504,149 @@ public abstract class FileSystem extends
}
}
+ /**
+ * Tracks statistics about how many reads, writes, and so forth have been
+ * done in a FileSystem.
+ *
+ * Since there is only one of these objects per FileSystem, there will
+ * typically be many threads writing to this object. Almost every operation
+ * on an open file will involve a write to this object. In contrast, reading
+ * statistics is done infrequently by most programs, and not at all by others.
+ * Hence, this is optimized for writes.
+ *
+ * Each thread writes to its own thread-local area of memory. This removes
+ * contention and allows us to scale up to many, many threads. To read
+ * statistics, the reader thread totals up the contents of all of the
+ * thread-local data areas.
+ */
public static final class Statistics {
+ /**
+ * Statistics data.
+ *
+ * There is only a single writer to thread-local StatisticsData objects.
+ * Hence, volatile is adequate here-- we do not need AtomicLong or similar
+ * to prevent lost updates.
+ * The Java specification guarantees that updates to volatile longs will
+ * be perceived as atomic with respect to other threads, which is all we
+ * need.
+ */
+ private static class StatisticsData {
+ volatile long bytesRead;
+ volatile long bytesWritten;
+ volatile int readOps;
+ volatile int largeReadOps;
+ volatile int writeOps;
+ /**
+ * Stores a weak reference to the thread owning this StatisticsData.
+ * This allows us to remove StatisticsData objects that pertain to
+ * threads that no longer exist.
+ */
+ final WeakReference<Thread> owner;
+
+ StatisticsData(WeakReference<Thread> owner) {
+ this.owner = owner;
+ }
+
+ /**
+ * Add another StatisticsData object to this one.
+ */
+ void add(StatisticsData other) {
+ this.bytesRead += other.bytesRead;
+ this.bytesWritten += other.bytesWritten;
+ this.readOps += other.readOps;
+ this.largeReadOps += other.largeReadOps;
+ this.writeOps += other.writeOps;
+ }
+
+ /**
+ * Negate the values of all statistics.
+ */
+ void negate() {
+ this.bytesRead = -this.bytesRead;
+ this.bytesWritten = -this.bytesWritten;
+ this.readOps = -this.readOps;
+ this.largeReadOps = -this.largeReadOps;
+ this.writeOps = -this.writeOps;
+ }
+
+ @Override
+ public String toString() {
+ return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
+ + readOps + " read ops, " + largeReadOps + " large read ops, "
+ + writeOps + " write ops";
+ }
+ }
+
+ private interface StatisticsAggregator<T> {
+ void accept(StatisticsData data);
+ T aggregate();
+ }
+
private final String scheme;
- private AtomicLong bytesRead = new AtomicLong();
- private AtomicLong bytesWritten = new AtomicLong();
- private AtomicInteger readOps = new AtomicInteger();
- private AtomicInteger largeReadOps = new AtomicInteger();
- private AtomicInteger writeOps = new AtomicInteger();
+
+ /**
+ * rootData is data that doesn't belong to any thread, but will be added
+ * to the totals. This is useful for making copies of Statistics objects,
+ * and for storing data that pertains to threads that have been garbage
+ * collected. Protected by the Statistics lock.
+ */
+ private final StatisticsData rootData;
+
+ /**
+ * Thread-local data.
+ */
+ private final ThreadLocal<StatisticsData> threadData;
+ /**
+ * List of all thread-local data areas. Protected by the Statistics lock.
+ */
+ private LinkedList<StatisticsData> allData;
+
public Statistics(String scheme) {
this.scheme = scheme;
+ this.rootData = new StatisticsData(null);
+ this.threadData = new ThreadLocal<StatisticsData>();
+ this.allData = null;
}
/**
* Copy constructor.
*
- * @param st
- * The input Statistics object which is cloned.
+ * @param other The input Statistics object which is cloned.
*/
- public Statistics(Statistics st) {
- this.scheme = st.scheme;
- this.bytesRead = new AtomicLong(st.bytesRead.longValue());
- this.bytesWritten = new AtomicLong(st.bytesWritten.longValue());
+ public Statistics(Statistics other) {
+ this.scheme = other.scheme;
+ this.rootData = new StatisticsData(null);
+ other.visitAll(new StatisticsAggregator<Void>() {
+ @Override
+ public void accept(StatisticsData data) {
+ rootData.add(data);
+ }
+
+ public Void aggregate() {
+ return null;
+ }
+ });
+ this.threadData = new ThreadLocal<StatisticsData>();
+ }
+
+ /**
+ * Get or create the thread-local data associated with the current thread.
+ */
+ private StatisticsData getThreadData() {
+ StatisticsData data = threadData.get();
+ if (data == null) {
+ data = new StatisticsData(
+ new WeakReference<Thread>(Thread.currentThread()));
+ threadData.set(data);
+ synchronized(this) {
+ if (allData == null) {
+ allData = new LinkedList<StatisticsData>();
+ }
+ allData.add(data);
+ }
+ }
+ return data;
}
/**
@@ -2531,7 +2654,7 @@ public abstract class FileSystem extends
* @param newBytes the additional bytes read
*/
public void incrementBytesRead(long newBytes) {
- bytesRead.getAndAdd(newBytes);
+ getThreadData().bytesRead += newBytes;
}
/**
@@ -2539,7 +2662,7 @@ public abstract class FileSystem extends
* @param newBytes the additional bytes written
*/
public void incrementBytesWritten(long newBytes) {
- bytesWritten.getAndAdd(newBytes);
+ getThreadData().bytesWritten += newBytes;
}
/**
@@ -2547,7 +2670,7 @@ public abstract class FileSystem extends
* @param count number of read operations
*/
public void incrementReadOps(int count) {
- readOps.getAndAdd(count);
+ getThreadData().readOps += count;
}
/**
@@ -2555,7 +2678,7 @@ public abstract class FileSystem extends
* @param count number of large read operations
*/
public void incrementLargeReadOps(int count) {
- largeReadOps.getAndAdd(count);
+ getThreadData().largeReadOps += count;
}
/**
@@ -2563,7 +2686,38 @@ public abstract class FileSystem extends
* @param count number of write operations
*/
public void incrementWriteOps(int count) {
- writeOps.getAndAdd(count);
+ getThreadData().writeOps += count;
+ }
+
+ /**
+ * Apply the given aggregator to all StatisticsData objects associated with
+ * this Statistics object.
+ *
+ * For each StatisticsData object, we will call accept on the visitor.
+ * Finally, at the end, we will call aggregate to get the final total.
+ *
+ * @param The visitor to use.
+ * @return The total.
+ */
+ private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
+ visitor.accept(rootData);
+ if (allData != null) {
+ for (Iterator<StatisticsData> iter = allData.iterator();
+ iter.hasNext(); ) {
+ StatisticsData data = iter.next();
+ visitor.accept(data);
+ if (data.owner.get() == null) {
+ /*
+ * If the thread that created this thread-local data no
+ * longer exists, remove the StatisticsData from our list
+ * and fold the values into rootData.
+ */
+ rootData.add(data);
+ iter.remove();
+ }
+ }
+ }
+ return visitor.aggregate();
}
/**
@@ -2571,7 +2725,18 @@ public abstract class FileSystem extends
* @return the number of bytes
*/
public long getBytesRead() {
- return bytesRead.get();
+ return visitAll(new StatisticsAggregator<Long>() {
+ private long bytesRead = 0;
+
+ @Override
+ public void accept(StatisticsData data) {
+ bytesRead += data.bytesRead;
+ }
+
+ public Long aggregate() {
+ return bytesRead;
+ }
+ });
}
/**
@@ -2579,7 +2744,18 @@ public abstract class FileSystem extends
* @return the number of bytes
*/
public long getBytesWritten() {
- return bytesWritten.get();
+ return visitAll(new StatisticsAggregator<Long>() {
+ private long bytesWritten = 0;
+
+ @Override
+ public void accept(StatisticsData data) {
+ bytesWritten += data.bytesWritten;
+ }
+
+ public Long aggregate() {
+ return bytesWritten;
+ }
+ });
}
/**
@@ -2587,7 +2763,19 @@ public abstract class FileSystem extends
* @return number of read operations
*/
public int getReadOps() {
- return readOps.get() + largeReadOps.get();
+ return visitAll(new StatisticsAggregator<Integer>() {
+ private int readOps = 0;
+
+ @Override
+ public void accept(StatisticsData data) {
+ readOps += data.readOps;
+ readOps += data.largeReadOps;
+ }
+
+ public Integer aggregate() {
+ return readOps;
+ }
+ });
}
/**
@@ -2596,7 +2784,18 @@ public abstract class FileSystem extends
* @return number of large read operations
*/
public int getLargeReadOps() {
- return largeReadOps.get();
+ return visitAll(new StatisticsAggregator<Integer>() {
+ private int largeReadOps = 0;
+
+ @Override
+ public void accept(StatisticsData data) {
+ largeReadOps += data.largeReadOps;
+ }
+
+ public Integer aggregate() {
+ return largeReadOps;
+ }
+ });
}
/**
@@ -2605,22 +2804,70 @@ public abstract class FileSystem extends
* @return number of write operations
*/
public int getWriteOps() {
- return writeOps.get();
+ return visitAll(new StatisticsAggregator<Integer>() {
+ private int writeOps = 0;
+
+ @Override
+ public void accept(StatisticsData data) {
+ writeOps += data.writeOps;
+ }
+
+ public Integer aggregate() {
+ return writeOps;
+ }
+ });
}
+
@Override
public String toString() {
- return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
- + readOps + " read ops, " + largeReadOps + " large read ops, "
- + writeOps + " write ops";
+ return visitAll(new StatisticsAggregator<String>() {
+ private StatisticsData total = new StatisticsData(null);
+
+ @Override
+ public void accept(StatisticsData data) {
+ total.add(data);
+ }
+
+ public String aggregate() {
+ return total.toString();
+ }
+ });
}
-
+
/**
- * Reset the counts of bytes to 0.
+ * Resets all statistics to 0.
+ *
+ * In order to reset, we add up all the thread-local statistics data, and
+ * set rootData to the negative of that.
+ *
+ * This may seem like a counterintuitive way to reset the statsitics. Why
+ * can't we just zero out all the thread-local data? Well, thread-local
+ * data can only be modified by the thread that owns it. If we tried to
+ * modify the thread-local data from this thread, our modification might get
+ * interleaved with a read-modify-write operation done by the thread that
+ * owns the data. That would result in our update getting lost.
+ *
+ * The approach used here avoids this problem because it only ever reads
+ * (not writes) the thread-local data. Both reads and writes to rootData
+ * are done under the lock, so we're free to modify rootData from any thread
+ * that holds the lock.
*/
public void reset() {
- bytesWritten.set(0);
- bytesRead.set(0);
+ visitAll(new StatisticsAggregator<Void>() {
+ private StatisticsData total = new StatisticsData(null);
+
+ @Override
+ public void accept(StatisticsData data) {
+ total.add(data);
+ }
+
+ public Void aggregate() {
+ total.negate();
+ rootData.add(total);
+ return null;
+ }
+ });
}
/**
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java Wed Oct 30 22:21:59 2013
@@ -1239,6 +1239,9 @@ public class FileUtil {
List<String> classPathEntryList = new ArrayList<String>(
classPathEntries.length);
for (String classPathEntry: classPathEntries) {
+ if (classPathEntry.length() == 0) {
+ continue;
+ }
if (classPathEntry.endsWith("*")) {
// Append all jars that match the wildcard
Path globPath = new Path(classPathEntry).suffix("{.jar,.JAR}");
@@ -1252,7 +1255,14 @@ public class FileUtil {
}
} else {
// Append just this entry
- String classPathEntryUrl = new File(classPathEntry).toURI().toURL()
+ File fileCpEntry = null;
+ if(!new Path(classPathEntry).isAbsolute()) {
+ fileCpEntry = new File(workingDir, classPathEntry);
+ }
+ else {
+ fileCpEntry = new File(classPathEntry);
+ }
+ String classPathEntryUrl = fileCpEntry.toURI().toURL()
.toExternalForm();
// File.toURI only appends trailing '/' if it can determine that it is a
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java Wed Oct 30 22:21:59 2013
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.fs;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -50,38 +51,26 @@ class Globber {
this.filter = filter;
}
- private FileStatus getFileStatus(Path path) {
+ private FileStatus getFileStatus(Path path) throws IOException {
try {
if (fs != null) {
return fs.getFileStatus(path);
} else {
return fc.getFileStatus(path);
}
- } catch (IOException e) {
+ } catch (FileNotFoundException e) {
return null;
}
}
- private FileStatus getFileLinkStatus(Path path) {
- try {
- if (fs != null) {
- return fs.getFileLinkStatus(path);
- } else {
- return fc.getFileLinkStatus(path);
- }
- } catch (IOException e) {
- return null;
- }
- }
-
- private FileStatus[] listStatus(Path path) {
+ private FileStatus[] listStatus(Path path) throws IOException {
try {
if (fs != null) {
return fs.listStatus(path);
} else {
return fc.util().listStatus(path);
}
- } catch (IOException e) {
+ } catch (FileNotFoundException e) {
return new FileStatus[0];
}
}
@@ -95,6 +84,15 @@ class Globber {
}
/**
+ * Convert a path component that contains backslash ecape sequences to a
+ * literal string. This is necessary when you want to explicitly refer to a
+ * path that contains globber metacharacters.
+ */
+ private static String unescapePathComponent(String name) {
+ return name.replaceAll("\\\\(.)", "$1");
+ }
+
+ /**
* Translate an absolute path into a list of path components.
* We merge double slashes into a single slash here.
* POSIX root path, i.e. '/', does not get an entry in the list.
@@ -134,18 +132,6 @@ class Globber {
return authority ;
}
- /**
- * The glob filter builds a regexp per path component. If the component
- * does not contain a shell metachar, then it falls back to appending the
- * raw string to the list of built up paths. This raw path needs to have
- * the quoting removed. Ie. convert all occurrences of "\X" to "X"
- * @param name of the path component
- * @return the unquoted path component
- */
- private static String unquotePathComponent(String name) {
- return name.replaceAll("\\\\(.)", "$1");
- }
-
public FileStatus[] glob() throws IOException {
// First we get the scheme and authority of the pattern that was passed
// in.
@@ -189,39 +175,54 @@ class Globber {
new Path(scheme, authority, Path.SEPARATOR)));
}
- for (String component : components) {
+ for (int componentIdx = 0; componentIdx < components.size();
+ componentIdx++) {
ArrayList<FileStatus> newCandidates =
new ArrayList<FileStatus>(candidates.size());
- GlobFilter globFilter = new GlobFilter(component);
+ GlobFilter globFilter = new GlobFilter(components.get(componentIdx));
+ String component = unescapePathComponent(components.get(componentIdx));
if (globFilter.hasPattern()) {
sawWildcard = true;
}
if (candidates.isEmpty() && sawWildcard) {
+ // Optimization: if there are no more candidates left, stop examining
+ // the path components. We can only do this if we've already seen
+ // a wildcard component-- otherwise, we still need to visit all path
+ // components in case one of them is a wildcard.
break;
}
- for (FileStatus candidate : candidates) {
- FileStatus resolvedCandidate = candidate;
- if (candidate.isSymlink()) {
- // We have to resolve symlinks, because otherwise we don't know
- // whether they are directories.
- resolvedCandidate = getFileStatus(candidate.getPath());
- }
- if (resolvedCandidate == null ||
- resolvedCandidate.isDirectory() == false) {
- continue;
+ if ((componentIdx < components.size() - 1) &&
+ (!globFilter.hasPattern())) {
+ // Optimization: if this is not the terminal path component, and we
+ // are not matching against a glob, assume that it exists. If it
+ // doesn't exist, we'll find out later when resolving a later glob
+ // or the terminal path component.
+ for (FileStatus candidate : candidates) {
+ candidate.setPath(new Path(candidate.getPath(), component));
}
- // For components without pattern, we get its FileStatus directly
- // using getFileLinkStatus for two reasons:
- // 1. It should be faster to only get FileStatus needed rather than
- // get all children.
- // 2. Some special filesystem directories (e.g. HDFS snapshot
- // directories) are not returned by listStatus, but do exist if
- // checked explicitly via getFileLinkStatus.
+ continue;
+ }
+ for (FileStatus candidate : candidates) {
if (globFilter.hasPattern()) {
FileStatus[] children = listStatus(candidate.getPath());
+ if (children.length == 1) {
+ // If we get back only one result, this could be either a listing
+ // of a directory with one entry, or it could reflect the fact
+ // that what we listed resolved to a file.
+ //
+ // Unfortunately, we can't just compare the returned paths to
+ // figure this out. Consider the case where you have /a/b, where
+ // b is a symlink to "..". In that case, listing /a/b will give
+ // back "/a/b" again. If we just went by returned pathname, we'd
+ // incorrectly conclude that /a/b was a file and should not match
+ // /a/*/*. So we use getFileStatus of the path we just listed to
+ // disambiguate.
+ if (!getFileStatus(candidate.getPath()).isDirectory()) {
+ continue;
+ }
+ }
for (FileStatus child : children) {
// Set the child path based on the parent path.
- // This keeps the symlinks in our path.
child.setPath(new Path(candidate.getPath(),
child.getPath().getName()));
if (globFilter.accept(child.getPath())) {
@@ -229,13 +230,17 @@ class Globber {
}
}
} else {
- Path p = new Path(candidate.getPath(), unquotePathComponent(component));
- FileStatus s = getFileLinkStatus(p);
- if (s != null) {
- s.setPath(p);
- newCandidates.add(s);
- }
- }
+ // When dealing with non-glob components, use getFileStatus
+ // instead of listStatus. This is an optimization, but it also
+ // is necessary for correctness in HDFS, since there are some
+ // special HDFS directories like .reserved and .snapshot that are
+ // not visible to listStatus, but which do exist. (See HADOOP-9877)
+ FileStatus childStatus = getFileStatus(
+ new Path(candidate.getPath(), component));
+ if (childStatus != null) {
+ newCandidates.add(childStatus);
+ }
+ }
}
candidates = newCandidates;
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java Wed Oct 30 22:21:59 2013
@@ -17,20 +17,6 @@
*/
package org.apache.hadoop.fs;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.HashMap;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -40,6 +26,14 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.Progressable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLDecoder;
+import java.util.*;
+
/**
* This is an implementation of the Hadoop Archive
* Filesystem. This archive Filesystem has index files
@@ -53,7 +47,7 @@ import org.apache.hadoop.util.Progressab
* index for ranges of hashcodes.
*/
-public class HarFileSystem extends FilterFileSystem {
+public class HarFileSystem extends FileSystem {
private static final Log LOG = LogFactory.getLog(HarFileSystem.class);
@@ -75,11 +69,13 @@ public class HarFileSystem extends Filte
// pointer into the static metadata cache
private HarMetaData metadata;
+ private FileSystem fs;
+
/**
* public construction of harfilesystem
- *
*/
public HarFileSystem() {
+ // Must call #initialize() method to set the underlying file system
}
/**
@@ -96,10 +92,11 @@ public class HarFileSystem extends Filte
/**
* Constructor to create a HarFileSystem with an
* underlying filesystem.
- * @param fs
+ * @param fs underlying file system
*/
public HarFileSystem(FileSystem fs) {
- super(fs);
+ this.fs = fs;
+ this.statistics = fs.statistics;
}
private synchronized void initializeMetadataCache(Configuration conf) {
@@ -171,6 +168,11 @@ public class HarFileSystem extends Filte
}
}
+ @Override
+ public Configuration getConf() {
+ return fs.getConf();
+ }
+
// get the version of the filesystem from the masterindex file
// the version is currently not useful since its the first version
// of archives
@@ -236,8 +238,7 @@ public class HarFileSystem extends Filte
throw new IOException("query component in Path not supported " + rawURI);
}
- URI tmp = null;
-
+ URI tmp;
try {
// convert <scheme>-<host> to <scheme>://<host>
URI baseUri = new URI(authority.replaceFirst("-", "://"));
@@ -256,7 +257,7 @@ public class HarFileSystem extends Filte
return URLDecoder.decode(str, "UTF-8");
}
- private String decodeFileName(String fname)
+ private String decodeFileName(String fname)
throws UnsupportedEncodingException {
int version = metadata.getVersion();
if (version == 2 || version == 3){
@@ -272,19 +273,30 @@ public class HarFileSystem extends Filte
public Path getWorkingDirectory() {
return new Path(uri.toString());
}
-
+
+ @Override
+ public Path getInitialWorkingDirectory() {
+ return getWorkingDirectory();
+ }
+
+ @Override
+ public FsStatus getStatus(Path p) throws IOException {
+ return fs.getStatus(p);
+ }
+
/**
* Create a har specific auth
* har-underlyingfs:port
- * @param underLyingURI the uri of underlying
+ * @param underLyingUri the uri of underlying
* filesystem
* @return har specific auth
*/
private String getHarAuth(URI underLyingUri) {
String auth = underLyingUri.getScheme() + "-";
if (underLyingUri.getHost() != null) {
- auth += underLyingUri.getHost() + ":";
+ auth += underLyingUri.getHost();
if (underLyingUri.getPort() != -1) {
+ auth += ":";
auth += underLyingUri.getPort();
}
}
@@ -293,7 +305,21 @@ public class HarFileSystem extends Filte
}
return auth;
}
-
+
+ /**
+ * Used for delegation token related functionality. Must delegate to
+ * underlying file system.
+ */
+ @Override
+ protected URI getCanonicalUri() {
+ return fs.getCanonicalUri();
+ }
+
+ @Override
+ protected URI canonicalizeUri(URI uri) {
+ return fs.canonicalizeUri(uri);
+ }
+
/**
* Returns the uri of this filesystem.
* The uri is of the form
@@ -304,6 +330,16 @@ public class HarFileSystem extends Filte
return this.uri;
}
+ @Override
+ protected void checkPath(Path path) {
+ fs.checkPath(path);
+ }
+
+ @Override
+ public Path resolvePath(Path p) throws IOException {
+ return fs.resolvePath(p);
+ }
+
/**
* this method returns the path
* inside the har filesystem.
@@ -418,7 +454,7 @@ public class HarFileSystem extends Filte
/**
* Get block locations from the underlying fs and fix their
* offsets and lengths.
- * @param file the input filestatus to get block locations
+ * @param file the input file status to get block locations
* @param start the start of the desired range in the contained file
* @param len the length of the desired range
* @return block locations for this segment of file
@@ -440,8 +476,7 @@ public class HarFileSystem extends Filte
}
/**
- * the hash of the path p inside iniside
- * the filesystem
+ * the hash of the path p inside the filesystem
* @param p the path in the harfilesystem
* @return the hash code of the path.
*/
@@ -474,13 +509,9 @@ public class HarFileSystem extends Filte
* the parent path directory
* @param statuses
* the list to add the children filestatuses to
- * @param children
- * the string list of children for this parent
- * @param archiveIndexStat
- * the archive index filestatus
*/
- private void fileStatusesInIndex(HarStatus parent, List<FileStatus> statuses,
- List<String> children) throws IOException {
+ private void fileStatusesInIndex(HarStatus parent, List<FileStatus> statuses)
+ throws IOException {
String parentString = parent.getName();
if (!parentString.endsWith(Path.SEPARATOR)){
parentString += Path.SEPARATOR;
@@ -546,7 +577,7 @@ public class HarFileSystem extends Filte
// stored in a single line in the index files
// the format is of the form
// filename "dir"/"file" partFileName startIndex length
- // <space seperated children>
+ // <space separated children>
private class HarStatus {
boolean isDir;
String name;
@@ -665,7 +696,6 @@ public class HarFileSystem extends Filte
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
// get the fs DataInputStream for the underlying file
HarStatus hstatus = getFileHarStatus(f);
- // we got it.. woo hooo!!!
if (hstatus.isDir()) {
throw new FileNotFoundException(f + " : not a file in " +
archivePath);
@@ -674,20 +704,39 @@ public class HarFileSystem extends Filte
hstatus.getPartName()),
hstatus.getStartIndex(), hstatus.getLength(), bufferSize);
}
-
+
+ /**
+ * Used for delegation token related functionality. Must delegate to
+ * underlying file system.
+ */
+ @Override
+ public FileSystem[] getChildFileSystems() {
+ return new FileSystem[]{fs};
+ }
+
@Override
- public FSDataOutputStream create(Path f,
- FsPermission permission,
- boolean overwrite,
- int bufferSize,
- short replication,
- long blockSize,
+ public FSDataOutputStream create(Path f, FsPermission permission,
+ boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
throw new IOException("Har: create not allowed.");
}
-
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
+ int bufferSize, short replication, long blockSize, Progressable progress)
+ throws IOException {
+ throw new IOException("Har: create not allowed.");
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
+ throw new IOException("Har: append not allowed.");
+ }
+
@Override
public void close() throws IOException {
+ super.close();
if (fs != null) {
try {
fs.close();
@@ -703,9 +752,19 @@ public class HarFileSystem extends Filte
*/
@Override
public boolean setReplication(Path src, short replication) throws IOException{
- throw new IOException("Har: setreplication not allowed");
+ throw new IOException("Har: setReplication not allowed");
}
-
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ throw new IOException("Har: rename not allowed");
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f) throws IOException {
+ throw new IOException("Har: append not allowed");
+ }
+
/**
* Not implemented.
*/
@@ -713,7 +772,7 @@ public class HarFileSystem extends Filte
public boolean delete(Path f, boolean recursive) throws IOException {
throw new IOException("Har: delete not allowed");
}
-
+
/**
* liststatus returns the children of a directory
* after looking up the index files.
@@ -732,7 +791,7 @@ public class HarFileSystem extends Filte
throw new FileNotFoundException("File " + f + " not found in " + archivePath);
}
if (hstatus.isDir()) {
- fileStatusesInIndex(hstatus, statuses, hstatus.children);
+ fileStatusesInIndex(hstatus, statuses);
} else {
statuses.add(toFileStatus(hstatus, null));
}
@@ -747,7 +806,7 @@ public class HarFileSystem extends Filte
public Path getHomeDirectory() {
return new Path(uri.toString());
}
-
+
@Override
public void setWorkingDirectory(Path newDir) {
//does nothing.
@@ -765,11 +824,17 @@ public class HarFileSystem extends Filte
* not implemented.
*/
@Override
- public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws
- IOException {
+ public void copyFromLocalFile(boolean delSrc, boolean overwrite,
+ Path src, Path dst) throws IOException {
throw new IOException("Har: copyfromlocalfile not allowed");
}
-
+
+ @Override
+ public void copyFromLocalFile(boolean delSrc, boolean overwrite,
+ Path[] srcs, Path dst) throws IOException {
+ throw new IOException("Har: copyfromlocalfile not allowed");
+ }
+
/**
* copies the file in the har filesystem to a local file.
*/
@@ -806,11 +871,16 @@ public class HarFileSystem extends Filte
throw new IOException("Har: setowner not allowed");
}
+ @Override
+ public void setTimes(Path p, long mtime, long atime) throws IOException {
+ throw new IOException("Har: setTimes not allowed");
+ }
+
/**
* Not implemented.
*/
@Override
- public void setPermission(Path p, FsPermission permisssion)
+ public void setPermission(Path p, FsPermission permission)
throws IOException {
throw new IOException("Har: setPermission not allowed");
}
@@ -828,11 +898,15 @@ public class HarFileSystem extends Filte
private long position, start, end;
//The underlying data input stream that the
// underlying filesystem will return.
- private FSDataInputStream underLyingStream;
+ private final FSDataInputStream underLyingStream;
//one byte buffer
- private byte[] oneBytebuff = new byte[1];
+ private final byte[] oneBytebuff = new byte[1];
+
HarFsInputStream(FileSystem fs, Path path, long start,
long length, int bufferSize) throws IOException {
+ if (length < 0) {
+ throw new IllegalArgumentException("Negative length ["+length+"]");
+ }
underLyingStream = fs.open(path, bufferSize);
underLyingStream.seek(start);
// the start of this file in the part file
@@ -846,7 +920,7 @@ public class HarFileSystem extends Filte
@Override
public synchronized int available() throws IOException {
long remaining = end - underLyingStream.getPos();
- if (remaining > (long)Integer.MAX_VALUE) {
+ if (remaining > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
return (int) remaining;
@@ -878,10 +952,14 @@ public class HarFileSystem extends Filte
return (ret <= 0) ? -1: (oneBytebuff[0] & 0xff);
}
+ // NB: currently this method actually never executed becusae
+ // java.io.DataInputStream.read(byte[]) directly delegates to
+ // method java.io.InputStream.read(byte[], int, int).
+ // However, potentially it can be invoked, so leave it intact for now.
@Override
public synchronized int read(byte[] b) throws IOException {
- int ret = read(b, 0, b.length);
- if (ret != -1) {
+ final int ret = read(b, 0, b.length);
+ if (ret > 0) {
position += ret;
}
return ret;
@@ -899,7 +977,7 @@ public class HarFileSystem extends Filte
newlen = (int) (end - position);
}
// end case
- if (newlen == 0)
+ if (newlen == 0)
return ret;
ret = underLyingStream.read(b, offset, newlen);
position += ret;
@@ -910,15 +988,19 @@ public class HarFileSystem extends Filte
public synchronized long skip(long n) throws IOException {
long tmpN = n;
if (tmpN > 0) {
- if (position + tmpN > end) {
- tmpN = end - position;
- }
+ final long actualRemaining = end - position;
+ if (tmpN > actualRemaining) {
+ tmpN = actualRemaining;
+ }
underLyingStream.seek(tmpN + position);
position += tmpN;
return tmpN;
- }
- return (tmpN < 0)? -1 : 0;
- }
+ }
+ // NB: the contract is described in java.io.InputStream.skip(long):
+ // this method returns the number of bytes actually skipped, so,
+ // the return value should never be negative.
+ return 0;
+ }
@Override
public synchronized long getPos() throws IOException {
@@ -926,18 +1008,27 @@ public class HarFileSystem extends Filte
}
@Override
- public synchronized void seek(long pos) throws IOException {
- if (pos < 0 || (start + pos > end)) {
- throw new IOException("Failed to seek: EOF");
- }
+ public synchronized void seek(final long pos) throws IOException {
+ validatePosition(pos);
position = start + pos;
underLyingStream.seek(position);
}
+ private void validatePosition(final long pos) throws IOException {
+ if (pos < 0) {
+ throw new IOException("Negative position: "+pos);
+ }
+ final long length = end - start;
+ if (pos > length) {
+ throw new IOException("Position behind the end " +
+ "of the stream (length = "+length+"): " + pos);
+ }
+ }
+
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
- //do not need to implement this
- // hdfs in itself does seektonewsource
+ // do not need to implement this
+ // hdfs in itself does seektonewsource
// while reading.
return false;
}
@@ -950,7 +1041,12 @@ public class HarFileSystem extends Filte
throws IOException {
int nlength = length;
if (start + nlength + pos > end) {
- nlength = (int) (end - (start + pos));
+ // length corrected to the real remaining length:
+ nlength = (int) (end - start - pos);
+ }
+ if (nlength <= 0) {
+ // EOS:
+ return -1;
}
return underLyingStream.read(pos + start , b, offset, nlength);
}
@@ -973,14 +1069,12 @@ public class HarFileSystem extends Filte
}
@Override
- public void setReadahead(Long readahead)
- throws IOException, UnsupportedEncodingException {
+ public void setReadahead(Long readahead) throws IOException {
underLyingStream.setReadahead(readahead);
}
@Override
- public void setDropBehind(Boolean dropBehind)
- throws IOException, UnsupportedEncodingException {
+ public void setDropBehind(Boolean dropBehind) throws IOException {
underLyingStream.setDropBehind(dropBehind);
}
}
@@ -998,19 +1092,6 @@ public class HarFileSystem extends Filte
long length, int bufsize) throws IOException {
super(new HarFsInputStream(fs, p, start, length, bufsize));
}
-
- /**
- * constructor for har input stream.
- * @param fs the underlying filesystem
- * @param p the path in the underlying file system
- * @param start the start position in the part file
- * @param length the length of valid data in the part file.
- * @throws IOException
- */
- public HarFSDataInputStream(FileSystem fs, Path p, long start, long length)
- throws IOException {
- super(new HarFsInputStream(fs, p, start, length, 0));
- }
}
private class HarMetaData {
@@ -1057,7 +1138,7 @@ public class HarFileSystem extends Filte
}
private void parseMetaData() throws IOException {
- Text line;
+ Text line = new Text();
long read;
FSDataInputStream in = null;
LineReader lin = null;
@@ -1067,7 +1148,6 @@ public class HarFileSystem extends Filte
FileStatus masterStat = fs.getFileStatus(masterIndexPath);
masterIndexTimestamp = masterStat.getModificationTime();
lin = new LineReader(in, getConf());
- line = new Text();
read = lin.readLine(line);
// the first line contains the version of the index file
@@ -1081,7 +1161,7 @@ public class HarFileSystem extends Filte
}
// each line contains a hashcode range and the index file name
- String[] readStr = null;
+ String[] readStr;
while(read < masterStat.getLen()) {
int b = lin.readLine(line);
read += b;
@@ -1093,6 +1173,9 @@ public class HarFileSystem extends Filte
endHash));
line.clear();
}
+ } catch (IOException ioe) {
+ LOG.warn("Encountered exception ", ioe);
+ throw ioe;
} finally {
IOUtils.cleanup(LOG, lin, in);
}
@@ -1144,4 +1227,43 @@ public class HarFileSystem extends Filte
return size() > MAX_ENTRIES;
}
}
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public FsServerDefaults getServerDefaults() throws IOException {
+ return fs.getServerDefaults();
+ }
+
+ @Override
+ public FsServerDefaults getServerDefaults(Path f) throws IOException {
+ return fs.getServerDefaults(f);
+ }
+
+ @Override
+ public long getUsed() throws IOException{
+ return fs.getUsed();
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public long getDefaultBlockSize() {
+ return fs.getDefaultBlockSize();
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public long getDefaultBlockSize(Path f) {
+ return fs.getDefaultBlockSize(f);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public short getDefaultReplication() {
+ return fs.getDefaultReplication();
+ }
+
+ @Override
+ public short getDefaultReplication(Path f) {
+ return fs.getDefaultReplication(f);
+ }
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Path.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Path.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Path.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Path.java Wed Oct 30 22:21:59 2013
@@ -218,10 +218,13 @@ public class Path implements Comparable
*/
public static Path mergePaths(Path path1, Path path2) {
String path2Str = path2.toUri().getPath();
- if(hasWindowsDrive(path2Str)) {
- path2Str = path2Str.substring(path2Str.indexOf(':')+1);
- }
- return new Path(path1 + path2Str);
+ path2Str = path2Str.substring(startPositionWithoutWindowsDrive(path2Str));
+ // Add path components explicitly, because simply concatenating two path
+ // string is not safe, for example:
+ // "/" + "/foo" yields "//foo", which will be parsed as authority in Path
+ return new Path(path1.toUri().getScheme(),
+ path1.toUri().getAuthority(),
+ path1.toUri().getPath() + path2Str);
}
/**
@@ -247,8 +250,8 @@ public class Path implements Comparable
}
// trim trailing slash from non-root path (ignoring windows drive)
- int minLength = hasWindowsDrive(path) ? 4 : 1;
- if (path.length() > minLength && path.endsWith("/")) {
+ int minLength = startPositionWithoutWindowsDrive(path) + 1;
+ if (path.length() > minLength && path.endsWith(SEPARATOR)) {
path = path.substring(0, path.length()-1);
}
@@ -259,6 +262,14 @@ public class Path implements Comparable
return (WINDOWS && hasDriveLetterSpecifier.matcher(path).find());
}
+ private static int startPositionWithoutWindowsDrive(String path) {
+ if (hasWindowsDrive(path)) {
+ return path.charAt(0) == SEPARATOR_CHAR ? 3 : 2;
+ } else {
+ return 0;
+ }
+ }
+
/**
* Determine whether a given path string represents an absolute path on
* Windows. e.g. "C:/a/b" is an absolute path. "C:a/b" is not.
@@ -270,13 +281,11 @@ public class Path implements Comparable
*/
public static boolean isWindowsAbsolutePath(final String pathString,
final boolean slashed) {
- int start = (slashed ? 1 : 0);
-
- return
- hasWindowsDrive(pathString) &&
- pathString.length() >= (start + 3) &&
- ((pathString.charAt(start + 2) == SEPARATOR_CHAR) ||
- (pathString.charAt(start + 2) == '\\'));
+ int start = startPositionWithoutWindowsDrive(pathString);
+ return start > 0
+ && pathString.length() > start
+ && ((pathString.charAt(start) == SEPARATOR_CHAR) ||
+ (pathString.charAt(start) == '\\'));
}
/** Convert this to a URI. */
@@ -300,7 +309,7 @@ public class Path implements Comparable
* True if the path component (i.e. directory) of this URI is absolute.
*/
public boolean isUriPathAbsolute() {
- int start = hasWindowsDrive(uri.getPath()) ? 3 : 0;
+ int start = startPositionWithoutWindowsDrive(uri.getPath());
return uri.getPath().startsWith(SEPARATOR, start);
}
@@ -334,7 +343,7 @@ public class Path implements Comparable
public Path getParent() {
String path = uri.getPath();
int lastSlash = path.lastIndexOf('/');
- int start = hasWindowsDrive(path) ? 3 : 0;
+ int start = startPositionWithoutWindowsDrive(path);
if ((path.length() == start) || // empty path
(lastSlash == start && path.length() == start+1)) { // at root
return null;
@@ -343,8 +352,7 @@ public class Path implements Comparable
if (lastSlash==-1) {
parent = CUR_DIR;
} else {
- int end = hasWindowsDrive(path) ? 3 : 0;
- parent = path.substring(0, lastSlash==end?end+1:lastSlash);
+ parent = path.substring(0, lastSlash==start?start+1:lastSlash);
}
return new Path(uri.getScheme(), uri.getAuthority(), parent);
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Seekable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Seekable.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Seekable.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Seekable.java Wed Oct 30 22:21:59 2013
@@ -22,7 +22,9 @@ import java.io.*;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-/** Stream that permits seeking. */
+/**
+ * Stream that permits seeking.
+ */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface Seekable {
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Stat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Stat.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Stat.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Stat.java Wed Oct 30 22:21:59 2013
@@ -80,7 +80,7 @@ public class Stat extends Shell {
* @return
*/
public static boolean isAvailable() {
- if (Shell.LINUX || Shell.FREEBSD) {
+ if (Shell.LINUX || Shell.FREEBSD || Shell.MAC) {
return true;
}
return false;
@@ -100,7 +100,7 @@ public class Stat extends Shell {
if (Shell.LINUX) {
return new String[] {
"stat", derefFlag + "c", "%s,%F,%Y,%X,%a,%U,%G,%N", path.toString() };
- } else if (Shell.FREEBSD) {
+ } else if (Shell.FREEBSD || Shell.MAC) {
return new String[] {
"stat", derefFlag + "f", "%z,%HT,%m,%a,%Op,%Su,%Sg,`link' -> `%Y'",
path.toString() };
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java Wed Oct 30 22:21:59 2013
@@ -84,11 +84,16 @@ abstract class CommandWithDestination ex
*/
protected void getLocalDestination(LinkedList<String> args)
throws IOException {
+ String pathString = (args.size() < 2) ? Path.CUR_DIR : args.removeLast();
try {
- String pathString = (args.size() < 2) ? Path.CUR_DIR : args.removeLast();
dst = new PathData(new URI(pathString), getConf());
} catch (URISyntaxException e) {
- throw new IOException("unexpected URISyntaxException", e);
+ if (Path.WINDOWS) {
+ // Unlike URI, PathData knows how to parse Windows drive-letter paths.
+ dst = new PathData(pathString, getConf());
+ } else {
+ throw new IOException("unexpected URISyntaxException", e);
+ }
}
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java Wed Oct 30 22:21:59 2013
@@ -204,13 +204,18 @@ class CopyCommands {
// commands operating on local paths have no need for glob expansion
@Override
protected List<PathData> expandArgument(String arg) throws IOException {
+ List<PathData> items = new LinkedList<PathData>();
try {
- List<PathData> items = new LinkedList<PathData>();
items.add(new PathData(new URI(arg), getConf()));
- return items;
} catch (URISyntaxException e) {
- throw new IOException("unexpected URISyntaxException", e);
+ if (Path.WINDOWS) {
+ // Unlike URI, PathData knows how to parse Windows drive-letter paths.
+ items.add(new PathData(arg, getConf()));
+ } else {
+ throw new IOException("unexpected URISyntaxException", e);
+ }
}
+ return items;
}
@Override
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/SetReplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/SetReplication.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/SetReplication.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/SetReplication.java Wed Oct 30 22:21:59 2013
@@ -39,11 +39,14 @@ class SetReplication extends FsCommand {
}
public static final String NAME = "setrep";
- public static final String USAGE = "[-R] [-w] <rep> <path/file> ...";
+ public static final String USAGE = "[-R] [-w] <rep> <path> ...";
public static final String DESCRIPTION =
- "Set the replication level of a file.\n" +
- "The -R flag requests a recursive change of replication level\n" +
- "for an entire tree.";
+ "Set the replication level of a file. If <path> is a directory\n" +
+ "then the command recursively changes the replication factor of\n" +
+ "all files under the directory tree rooted at <path>.\n" +
+ "The -w flag requests that the command wait for the replication\n" +
+ "to complete. This can potentially take a very long time.\n" +
+ "The -R flag is accepted for backwards compatibility. It has no effect.";
protected short newRep = 0;
protected List<PathData> waitList = new LinkedList<PathData>();
@@ -54,7 +57,7 @@ class SetReplication extends FsCommand {
CommandFormat cf = new CommandFormat(2, Integer.MAX_VALUE, "R", "w");
cf.parse(args);
waitOpt = cf.getOpt("w");
- setRecursive(cf.getOpt("R"));
+ setRecursive(true);
try {
newRep = Short.parseShort(args.removeFirst());
@@ -126,4 +129,4 @@ class SetReplication extends FsCommand {
out.println(" done");
}
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/SnapshotCommands.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/SnapshotCommands.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/SnapshotCommands.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/SnapshotCommands.java Wed Oct 30 22:21:59 2013
@@ -68,7 +68,7 @@ class SnapshotCommands extends FsCommand
throw new IllegalArgumentException("<snapshotDir> is missing.");
}
if (args.size() > 2) {
- throw new IllegalArgumentException("Too many arguements.");
+ throw new IllegalArgumentException("Too many arguments.");
}
if (args.size() == 2) {
snapshotName = args.removeLast();
@@ -110,7 +110,7 @@ class SnapshotCommands extends FsCommand
@Override
protected void processOptions(LinkedList<String> args) throws IOException {
if (args.size() != 2) {
- throw new IOException("args number not 2: " + args.size());
+ throw new IllegalArgumentException("Incorrect number of arguments.");
}
snapshotName = args.removeLast();
}
@@ -150,7 +150,7 @@ class SnapshotCommands extends FsCommand
@Override
protected void processOptions(LinkedList<String> args) throws IOException {
if (args.size() != 3) {
- throw new IOException("args number not 3: " + args.size());
+ throw new IllegalArgumentException("Incorrect number of arguments.");
}
newName = args.removeLast();
oldName = args.removeLast();
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java Wed Oct 30 22:21:59 2013
@@ -568,6 +568,9 @@ public class ActiveStandbyElector implem
enterNeutralMode();
reJoinElection(0);
break;
+ case SaslAuthenticated:
+ LOG.info("Successfully authenticated to ZooKeeper using SASL.");
+ break;
default:
fatalError("Unexpected Zookeeper watch event state: "
+ event.getState());
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java Wed Oct 30 22:21:59 2013
@@ -63,7 +63,7 @@ public abstract class HAAdmin extends Co
private int rpcTimeoutForChecks = -1;
- private static Map<String, UsageInfo> USAGE =
+ protected final static Map<String, UsageInfo> USAGE =
ImmutableMap.<String, UsageInfo>builder()
.put("-transitionToActive",
new UsageInfo("<serviceId>", "Transitions the service into Active state"))
@@ -91,6 +91,14 @@ public abstract class HAAdmin extends Co
protected PrintStream out = System.out;
private RequestSource requestSource = RequestSource.REQUEST_BY_USER;
+ protected HAAdmin() {
+ super();
+ }
+
+ protected HAAdmin(Configuration conf) {
+ super(conf);
+ }
+
protected abstract HAServiceTarget resolveTarget(String string);
protected String getUsageString() {
@@ -461,9 +469,9 @@ public abstract class HAAdmin extends Co
return 0;
}
- private static class UsageInfo {
- private final String args;
- private final String help;
+ protected static class UsageInfo {
+ public final String args;
+ public final String help;
public UsageInfo(String args, String help) {
this.args = args;
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java Wed Oct 30 22:21:59 2013
@@ -43,13 +43,15 @@ public interface HAServiceProtocol {
public static final long versionID = 1L;
/**
- * An HA service may be in active or standby state. During
- * startup, it is in an unknown INITIALIZING state.
+ * An HA service may be in active or standby state. During startup, it is in
+ * an unknown INITIALIZING state. During shutdown, it is in the STOPPING state
+ * and can no longer return to active/standby states.
*/
public enum HAServiceState {
INITIALIZING("initializing"),
ACTIVE("active"),
- STANDBY("standby");
+ STANDBY("standby"),
+ STOPPING("stopping");
private String name;
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpConfig.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpConfig.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpConfig.java Wed Oct 30 22:21:59 2013
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.http;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -29,26 +28,41 @@ import org.apache.hadoop.fs.CommonConfig
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class HttpConfig {
- private static boolean sslEnabled;
+ private static Policy policy;
+ public enum Policy {
+ HTTP_ONLY,
+ HTTPS_ONLY;
+
+ public static Policy fromString(String value) {
+ if (value.equalsIgnoreCase(CommonConfigurationKeysPublic
+ .HTTP_POLICY_HTTPS_ONLY)) {
+ return HTTPS_ONLY;
+ }
+ return HTTP_ONLY;
+ }
+ }
static {
Configuration conf = new Configuration();
- sslEnabled = conf.getBoolean(
- CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY,
- CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT);
+ boolean sslEnabled = conf.getBoolean(
+ CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY,
+ CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT);
+ policy = sslEnabled ? Policy.HTTPS_ONLY : Policy.HTTP_ONLY;
}
- @VisibleForTesting
- static void setSecure(boolean secure) {
- sslEnabled = secure;
+ public static void setPolicy(Policy policy) {
+ HttpConfig.policy = policy;
}
public static boolean isSecure() {
- return sslEnabled;
+ return policy == Policy.HTTPS_ONLY;
}
public static String getSchemePrefix() {
return (isSecure()) ? "https://" : "http://";
}
+ public static String getScheme(Policy policy) {
+ return policy == Policy.HTTPS_ONLY ? "https://" : "http://";
+ }
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer.java Wed Oct 30 22:21:59 2013
@@ -66,9 +66,12 @@ import org.mortbay.io.Buffer;
import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Handler;
import org.mortbay.jetty.MimeTypes;
+import org.mortbay.jetty.RequestLog;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.handler.ContextHandler;
import org.mortbay.jetty.handler.ContextHandlerCollection;
+import org.mortbay.jetty.handler.RequestLogHandler;
+import org.mortbay.jetty.handler.HandlerCollection;
import org.mortbay.jetty.nio.SelectChannelConnector;
import org.mortbay.jetty.security.SslSocketConnector;
import org.mortbay.jetty.servlet.Context;
@@ -337,6 +340,7 @@ public class HttpServer implements Filte
}
listener.setHost(bindAddress);
listener.setPort(port);
+ LOG.info("SSL is enabled on " + toString());
} else {
listenerStartedExternally = true;
listener = connector;
@@ -354,7 +358,18 @@ public class HttpServer implements Filte
final String appDir = getWebAppsPath(name);
ContextHandlerCollection contexts = new ContextHandlerCollection();
- webServer.setHandler(contexts);
+ RequestLog requestLog = HttpRequestLog.getRequestLog(name);
+
+ if (requestLog != null) {
+ RequestLogHandler requestLogHandler = new RequestLogHandler();
+ requestLogHandler.setRequestLog(requestLog);
+ HandlerCollection handlers = new HandlerCollection();
+ handlers.setHandlers(new Handler[] {requestLogHandler, contexts});
+ webServer.setHandler(handlers);
+ }
+ else {
+ webServer.setHandler(contexts);
+ }
webAppContext = new WebAppContext();
webAppContext.setDisplayName(name);
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java Wed Oct 30 22:21:59 2013
@@ -64,7 +64,7 @@ public class RetryInvocationHandler<T> i
this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap());
}
- RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
+ protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
RetryPolicy defaultPolicy,
Map<String, RetryPolicy> methodNameToPolicyMap) {
this.proxyProvider = proxyProvider;
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java Wed Oct 30 22:21:59 2013
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.ConnectTimeoutException;
@@ -531,6 +532,15 @@ public class RetryPolicies {
this.maxDelayBase = maxDelayBase;
}
+ /**
+ * @return 0 if this is our first failover/retry (i.e., retry immediately),
+ * sleep exponentially otherwise
+ */
+ private long getFailoverOrRetrySleepTime(int times) {
+ return times == 0 ? 0 :
+ calculateExponentialTime(delayMillis, times, maxDelayBase);
+ }
+
@Override
public RetryAction shouldRetry(Exception e, int retries,
int failovers, boolean isIdempotentOrAtMostOnce) throws Exception {
@@ -546,11 +556,8 @@ public class RetryPolicies {
e instanceof StandbyException ||
e instanceof ConnectTimeoutException ||
isWrappedStandbyException(e)) {
- return new RetryAction(
- RetryAction.RetryDecision.FAILOVER_AND_RETRY,
- // retry immediately if this is our first failover, sleep otherwise
- failovers == 0 ? 0 :
- calculateExponentialTime(delayMillis, failovers, maxDelayBase));
+ return new RetryAction(RetryAction.RetryDecision.FAILOVER_AND_RETRY,
+ getFailoverOrRetrySleepTime(failovers));
} else if (e instanceof SocketException ||
(e instanceof IOException && !(e instanceof RemoteException))) {
if (isIdempotentOrAtMostOnce) {
@@ -561,8 +568,14 @@ public class RetryPolicies {
"whether it was invoked");
}
} else {
- return fallbackPolicy.shouldRetry(e, retries, failovers,
- isIdempotentOrAtMostOnce);
+ RetriableException re = getWrappedRetriableException(e);
+ if (re != null) {
+ return new RetryAction(RetryAction.RetryDecision.RETRY,
+ getFailoverOrRetrySleepTime(retries));
+ } else {
+ return fallbackPolicy.shouldRetry(e, retries, failovers,
+ isIdempotentOrAtMostOnce);
+ }
}
}
@@ -596,4 +609,14 @@ public class RetryPolicies {
StandbyException.class);
return unwrapped instanceof StandbyException;
}
+
+ private static RetriableException getWrappedRetriableException(Exception e) {
+ if (!(e instanceof RemoteException)) {
+ return null;
+ }
+ Exception unwrapped = ((RemoteException)e).unwrapRemoteException(
+ RetriableException.class);
+ return unwrapped instanceof RetriableException ?
+ (RetriableException) unwrapped : null;
+ }
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Wed Oct 30 22:21:59 2013
@@ -1063,8 +1063,8 @@ public class Client {
if (status == RpcStatusProto.SUCCESS) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value
- call.setRpcResponse(value);
calls.remove(callId);
+ call.setRpcResponse(value);
// verify that length was correct
// only for ProtobufEngine where len can be verified easily
@@ -1098,8 +1098,8 @@ public class Client {
new RemoteException(exceptionClassName, errorMsg) :
new RemoteException(exceptionClassName, errorMsg, erCode));
if (status == RpcStatusProto.ERROR) {
- call.setException(re);
calls.remove(callId);
+ call.setException(re);
} else if (status == RpcStatusProto.FATAL) {
// Close the connection
markClosed(re);
@@ -1166,8 +1166,8 @@ public class Client {
Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;
while (itor.hasNext()) {
Call c = itor.next().getValue();
+ itor.remove();
c.setException(closeException); // local exception
- itor.remove();
}
}
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java Wed Oct 30 22:21:59 2013
@@ -76,6 +76,12 @@ public class RetryCache {
this.expirationTime = expirationTime;
}
+ CacheEntry(byte[] clientId, int callId, long expirationTime,
+ boolean success) {
+ this(clientId, callId, expirationTime);
+ this.state = success ? SUCCESS : FAILED;
+ }
+
private static int hashCode(long value) {
return (int)(value ^ (value >>> 32));
}
@@ -147,6 +153,12 @@ public class RetryCache {
this.payload = payload;
}
+ CacheEntryWithPayload(byte[] clientId, int callId, Object payload,
+ long expirationTime, boolean success) {
+ super(clientId, callId, expirationTime, success);
+ this.payload = payload;
+ }
+
/** Override equals to avoid findbugs warnings */
@Override
public boolean equals(Object obj) {
@@ -253,18 +265,20 @@ public class RetryCache {
*/
public void addCacheEntry(byte[] clientId, int callId) {
CacheEntry newEntry = new CacheEntry(clientId, callId, System.nanoTime()
- + expirationTime);
- newEntry.completed(true);
- set.put(newEntry);
+ + expirationTime, true);
+ synchronized(this) {
+ set.put(newEntry);
+ }
}
public void addCacheEntryWithPayload(byte[] clientId, int callId,
Object payload) {
- CacheEntry newEntry = new CacheEntryWithPayload(clientId, callId, payload,
- System.nanoTime() + expirationTime);
// since the entry is loaded from editlog, we can assume it succeeded.
- newEntry.completed(true);
- set.put(newEntry);
+ CacheEntry newEntry = new CacheEntryWithPayload(clientId, callId, payload,
+ System.nanoTime() + expirationTime, true);
+ synchronized(this) {
+ set.put(newEntry);
+ }
}
private static CacheEntry newEntry(long expirationTime) {
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Wed Oct 30 22:21:59 2013
@@ -1292,6 +1292,29 @@ public abstract class Server {
}
}
+ private Throwable getCauseForInvalidToken(IOException e) {
+ Throwable cause = e;
+ while (cause != null) {
+ if (cause instanceof RetriableException) {
+ return (RetriableException) cause;
+ } else if (cause instanceof StandbyException) {
+ return (StandbyException) cause;
+ } else if (cause instanceof InvalidToken) {
+ // FIXME: hadoop method signatures are restricting the SASL
+ // callbacks to only returning InvalidToken, but some services
+ // need to throw other exceptions (ex. NN + StandyException),
+ // so for now we'll tunnel the real exceptions via an
+ // InvalidToken's cause which normally is not set
+ if (cause.getCause() != null) {
+ cause = cause.getCause();
+ }
+ return cause;
+ }
+ cause = cause.getCause();
+ }
+ return e;
+ }
+
private void saslProcess(RpcSaslProto saslMessage)
throws WrappedRpcServerException, IOException, InterruptedException {
if (saslContextEstablished) {
@@ -1304,29 +1327,11 @@ public abstract class Server {
try {
saslResponse = processSaslMessage(saslMessage);
} catch (IOException e) {
- IOException sendToClient = e;
- Throwable cause = e;
- while (cause != null) {
- if (cause instanceof InvalidToken) {
- // FIXME: hadoop method signatures are restricting the SASL
- // callbacks to only returning InvalidToken, but some services
- // need to throw other exceptions (ex. NN + StandyException),
- // so for now we'll tunnel the real exceptions via an
- // InvalidToken's cause which normally is not set
- if (cause.getCause() != null) {
- cause = cause.getCause();
- }
- sendToClient = (IOException) cause;
- break;
- }
- cause = cause.getCause();
- }
rpcMetrics.incrAuthenticationFailures();
- String clientIP = this.toString();
// attempting user could be null
- AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser +
- " (" + e.getLocalizedMessage() + ")");
- throw sendToClient;
+ AUDITLOG.warn(AUTH_FAILED_FOR + this.toString() + ":"
+ + attemptingUser + " (" + e.getLocalizedMessage() + ")");
+ throw (IOException) getCauseForInvalidToken(e);
}
if (saslServer != null && saslServer.isComplete()) {
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/filter/AbstractPatternFilter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/filter/AbstractPatternFilter.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/filter/AbstractPatternFilter.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/filter/AbstractPatternFilter.java Wed Oct 30 22:21:59 2013
@@ -112,7 +112,7 @@ public abstract class AbstractPatternFil
return false;
}
// Reject if no match in whitelist only mode
- if (ipat != null && epat == null) {
+ if (!includeTagPatterns.isEmpty() && excludeTagPatterns.isEmpty()) {
return false;
}
return true;
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/package-info.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/package-info.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/package-info.java Wed Oct 30 22:21:59 2013
@@ -234,7 +234,7 @@
patterns.
</p>
<p>Similarly, you can specify the <code>record.filter</code> and
- <code>metrics.filter</code> options, which operate at record and metric
+ <code>metric.filter</code> options, which operate at record and metric
level, respectively. Filters can be combined to optimize
the filtering efficiency.</p>
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/CachedDNSToSwitchMapping.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/CachedDNSToSwitchMapping.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/CachedDNSToSwitchMapping.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/CachedDNSToSwitchMapping.java Wed Oct 30 22:21:59 2013
@@ -154,4 +154,11 @@ public class CachedDNSToSwitchMapping ex
public void reloadCachedMappings() {
cache.clear();
}
+
+ @Override
+ public void reloadCachedMappings(List<String> names) {
+ for (String name : names) {
+ cache.remove(name);
+ }
+ }
}