You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tu...@apache.org on 2012/12/07 02:53:48 UTC
svn commit: r1418161 - in
/hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common: ./
src/main/docs/ src/main/java/ src/main/java/org/apache/hadoop/fs/viewfs/
src/main/java/org/apache/hadoop/io/ src/main/java/org/apache/hadoop/ipc/
src/main...
Author: tucu
Date: Fri Dec 7 01:53:35 2012
New Revision: 1418161
URL: http://svn.apache.org/viewvc?rev=1418161&view=rev
Log:
Merge from trunk
Modified:
hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/CHANGES.txt (contents, props changed)
hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/docs/ (props changed)
hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/ (props changed)
hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/UTF8.java
hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsSystem.java
hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/core/ (props changed)
hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemTestSetup.java
hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsTestSetup.java
hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestUTF8.java
hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java
hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
Modified: hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/CHANGES.txt Fri Dec 7 01:53:35 2012
@@ -289,6 +289,9 @@ Trunk (Unreleased)
HADOOP-9037. Bug in test-patch.sh and precommit build process (Kihwal Lee
via jlowe)
+ HADOOP-9121. InodeTree.java has redundant check for vName while
+ throwing exception. (Arup Malakar via suresh)
+
OPTIMIZATIONS
HADOOP-7761. Improve the performance of raw comparisons. (todd)
@@ -306,6 +309,10 @@ Release 2.0.3-alpha - Unreleased
HADOOP-9020. Add a SASL PLAIN server (daryn via bobby)
+ HADOOP-9090. Support on-demand publish of metrics. (Mostafa Elhemali via
+ suresh)
+
+
IMPROVEMENTS
HADOOP-8789. Tests setLevel(Level.OFF) should be Level.ERROR.
@@ -453,6 +460,14 @@ Release 2.0.3-alpha - Unreleased
HADOOP-9064. Augment DelegationTokenRenewer API to cancel the tokens on
calls to removeRenewAction. (kkambatl via tucu)
+ HADOOP-8958. ViewFs:Non absolute mount name failures when running
+ multiple tests on Windows. (Chris Nauroth via suresh)
+
+ HADOOP-9103. UTF8 class does not properly decode Unicode characters
+ outside the basic multilingual plane. (todd)
+
+ HADOOP-9070. Kerberos SASL server cannot find kerberos key. (daryn via atm)
+
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES
Propchange: hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:r1414747-1418159
Propchange: hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/docs/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/docs:r1414747-1418159
Propchange: hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1414747-1418159
Modified: hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java Fri Dec 7 01:53:35 2012
@@ -118,8 +118,7 @@ abstract class InodeTree<T> {
return result;
}
- INode<T> resolveInternal(final String pathComponent)
- throws FileNotFoundException {
+ INode<T> resolveInternal(final String pathComponent) {
return children.get(pathComponent);
}
@@ -336,8 +335,8 @@ abstract class InodeTree<T> {
}
if (!gotMountTableEntry) {
throw new IOException(
- "ViewFs: Cannot initialize: Empty Mount table in config for " +
- vName == null ? "viewfs:///" : ("viewfs://" + vName + "/"));
+ "ViewFs: Cannot initialize: Empty Mount table in config for " +
+ "viewfs://" + vName + "/");
}
}
Modified: hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/UTF8.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/UTF8.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/UTF8.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/UTF8.java Fri Dec 7 01:53:35 2012
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
+import org.apache.hadoop.util.StringUtils;
import org.apache.commons.logging.*;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -31,6 +32,9 @@ import org.apache.hadoop.classification.
*
* <p>Also includes utilities for efficiently reading and writing UTF-8.
*
+ * Note that this decodes UTF-8 but actually encodes CESU-8, a variant of
+ * UTF-8: see http://en.wikipedia.org/wiki/CESU-8
+ *
* @deprecated replaced by Text
*/
@Deprecated
@@ -209,6 +213,19 @@ public class UTF8 implements WritableCom
return result;
}
+ /**
+ * Convert a UTF-8 encoded byte array back into a string.
+ *
+ * @throws IOException if the byte array is invalid UTF8
+ */
+ public static String fromBytes(byte[] bytes) throws IOException {
+ DataInputBuffer dbuf = new DataInputBuffer();
+ dbuf.reset(bytes, 0, bytes.length);
+ StringBuilder buf = new StringBuilder(bytes.length);
+ readChars(dbuf, buf, bytes.length);
+ return buf.toString();
+ }
+
/** Read a UTF-8 encoded string.
*
* @see DataInput#readUTF()
@@ -230,18 +247,48 @@ public class UTF8 implements WritableCom
while (i < nBytes) {
byte b = bytes[i++];
if ((b & 0x80) == 0) {
+ // 0b0xxxxxxx: 1-byte sequence
buffer.append((char)(b & 0x7F));
- } else if ((b & 0xE0) != 0xE0) {
+ } else if ((b & 0xE0) == 0xC0) {
+ // 0b110xxxxx: 2-byte sequence
buffer.append((char)(((b & 0x1F) << 6)
| (bytes[i++] & 0x3F)));
- } else {
+ } else if ((b & 0xF0) == 0xE0) {
+ // 0b1110xxxx: 3-byte sequence
buffer.append((char)(((b & 0x0F) << 12)
| ((bytes[i++] & 0x3F) << 6)
| (bytes[i++] & 0x3F)));
+ } else if ((b & 0xF8) == 0xF0) {
+ // 0b11110xxx: 4-byte sequence
+ int codepoint =
+ ((b & 0x07) << 18)
+ | ((bytes[i++] & 0x3F) << 12)
+ | ((bytes[i++] & 0x3F) << 6)
+ | ((bytes[i++] & 0x3F));
+ buffer.append(highSurrogate(codepoint))
+ .append(lowSurrogate(codepoint));
+ } else {
+ // The UTF8 standard describes 5-byte and 6-byte sequences, but
+ // these are no longer allowed as of 2003 (see RFC 3629)
+
+ // Only show the next 6 bytes max in the error code - in case the
+ // buffer is large, this will prevent an exceedingly large message.
+ int endForError = Math.min(i + 5, nBytes);
+ throw new IOException("Invalid UTF8 at " +
+ StringUtils.byteToHexString(bytes, i - 1, endForError));
}
}
}
+ private static char highSurrogate(int codePoint) {
+ return (char) ((codePoint >>> 10)
+ + (Character.MIN_HIGH_SURROGATE - (Character.MIN_SUPPLEMENTARY_CODE_POINT >>> 10)));
+ }
+
+ private static char lowSurrogate(int codePoint) {
+ return (char) ((codePoint & 0x3ff) + Character.MIN_LOW_SURROGATE);
+ }
+
/** Write a UTF-8 encoded string.
*
* @see DataOutput#writeUTF(String)
Modified: hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Fri Dec 7 01:53:35 2012
@@ -199,7 +199,8 @@ public abstract class Server {
// in ObjectWritable to efficiently transmit arrays of primitives
// 6 : Made RPC payload header explicit
// 7 : Changed Ipc Connection Header to use Protocol buffers
- public static final byte CURRENT_VERSION = 7;
+ // 8 : SASL server always sends a final response
+ public static final byte CURRENT_VERSION = 8;
/**
* Initial and max size of response buffer
@@ -1220,8 +1221,8 @@ public abstract class Server {
AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
throw e;
}
- if (replyToken == null && authMethod == AuthMethod.PLAIN) {
- // client needs at least response to know if it should use SIMPLE
+ if (saslServer.isComplete() && replyToken == null) {
+ // send final response for success
replyToken = new byte[0];
}
if (replyToken != null) {
@@ -1392,7 +1393,7 @@ public abstract class Server {
}
private AuthMethod initializeAuthContext(AuthMethod authMethod)
- throws IOException {
+ throws IOException, InterruptedException {
try {
if (enabledAuthMethods.contains(authMethod)) {
saslServer = createSaslServer(authMethod);
@@ -1425,8 +1426,7 @@ public abstract class Server {
}
private SaslServer createSaslServer(AuthMethod authMethod)
- throws IOException {
- SaslServer saslServer = null;
+ throws IOException, InterruptedException {
String hostname = null;
String saslProtocol = null;
CallbackHandler saslCallback = null;
@@ -1462,10 +1462,23 @@ public abstract class Server {
"Server does not support SASL " + authMethod);
}
- String mechanism = authMethod.getMechanismName();
- saslServer = Sasl.createSaslServer(
- mechanism, saslProtocol, hostname,
- SaslRpcServer.SASL_PROPS, saslCallback);
+ return createSaslServer(authMethod.getMechanismName(), saslProtocol,
+ hostname, saslCallback);
+ }
+
+ private SaslServer createSaslServer(final String mechanism,
+ final String protocol,
+ final String hostname,
+ final CallbackHandler callback
+ ) throws IOException, InterruptedException {
+ SaslServer saslServer = UserGroupInformation.getCurrentUser().doAs(
+ new PrivilegedExceptionAction<SaslServer>() {
+ @Override
+ public SaslServer run() throws SaslException {
+ return Sasl.createSaslServer(mechanism, protocol, hostname,
+ SaslRpcServer.SASL_PROPS, callback);
+ }
+ });
if (saslServer == null) {
throw new AccessControlException(
"Unable to find SASL server implementation for " + mechanism);
Modified: hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsSystem.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsSystem.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsSystem.java Fri Dec 7 01:53:35 2012
@@ -91,6 +91,17 @@ public abstract class MetricsSystem impl
public abstract void register(Callback callback);
/**
+ * Requests an immediate publish of all metrics from sources to sinks.
+ *
+ * This is a "soft" request: the expectation is that a best effort will be
+ * done to synchronously snapshot the metrics from all the sources and put
+ * them in all the sinks (including flushing the sinks) before returning to
+ * the caller. If this can't be accomplished in reasonable time it's OK to
+ * return to the caller before everything is done.
+ */
+ public abstract void publishMetricsNow();
+
+ /**
* Shutdown the metrics system completely (usually during server shutdown.)
* The MetricsSystemMXBean will be unregistered.
* @return true if shutdown completed
Modified: hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java Fri Dec 7 01:53:35 2012
@@ -19,6 +19,7 @@
package org.apache.hadoop.metrics2.impl;
import java.util.Random;
+import java.util.concurrent.*;
import static com.google.common.base.Preconditions.*;
@@ -48,6 +49,7 @@ class MetricsSinkAdapter implements Sink
private volatile boolean stopping = false;
private volatile boolean inError = false;
private final int period, firstRetryDelay, retryCount;
+ private final long oobPutTimeout;
private final float retryBackoff;
private final MetricsRegistry registry = new MetricsRegistry("sinkadapter");
private final MutableStat latency;
@@ -69,6 +71,8 @@ class MetricsSinkAdapter implements Sink
this.period = checkArg(period, period > 0, "period");
firstRetryDelay = checkArg(retryDelay, retryDelay > 0, "retry delay");
this.retryBackoff = checkArg(retryBackoff, retryBackoff>1, "retry backoff");
+ oobPutTimeout = (long)
+ (firstRetryDelay * Math.pow(retryBackoff, retryCount) * 1000);
this.retryCount = retryCount;
this.queue = new SinkQueue<MetricsBuffer>(checkArg(queueCapacity,
queueCapacity > 0, "queue capacity"));
@@ -95,6 +99,23 @@ class MetricsSinkAdapter implements Sink
}
return true; // OK
}
+
+ public boolean putMetricsImmediate(MetricsBuffer buffer) {
+ WaitableMetricsBuffer waitableBuffer =
+ new WaitableMetricsBuffer(buffer);
+ if (!queue.enqueue(waitableBuffer)) {
+ LOG.warn(name + " has a full queue and can't consume the given metrics.");
+ dropped.incr();
+ return false;
+ }
+ if (!waitableBuffer.waitTillNotified(oobPutTimeout)) {
+ LOG.warn(name +
+ " couldn't fulfill an immediate putMetrics request in time." +
+ " Abandoning.");
+ return false;
+ }
+ return true;
+ }
void publishMetricsFromQueue() {
int retryDelay = firstRetryDelay;
@@ -158,6 +179,9 @@ class MetricsSinkAdapter implements Sink
sink.flush();
latency.add(Time.now() - ts);
}
+ if (buffer instanceof WaitableMetricsBuffer) {
+ ((WaitableMetricsBuffer)buffer).notifyAnyWaiters();
+ }
LOG.debug("Done");
}
@@ -191,4 +215,26 @@ class MetricsSinkAdapter implements Sink
MetricsSink sink() {
return sink;
}
+
+ static class WaitableMetricsBuffer extends MetricsBuffer {
+ private final Semaphore notificationSemaphore =
+ new Semaphore(0);
+
+ public WaitableMetricsBuffer(MetricsBuffer metricsBuffer) {
+ super(metricsBuffer);
+ }
+
+ public boolean waitTillNotified(long millisecondsToWait) {
+ try {
+ return notificationSemaphore.tryAcquire(millisecondsToWait,
+ TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+
+ public void notifyAnyWaiters() {
+ notificationSemaphore.release();
+ }
+ }
}
Modified: hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java Fri Dec 7 01:53:35 2012
@@ -344,9 +344,19 @@ public class MetricsSystemImpl extends M
synchronized void onTimerEvent() {
logicalTime += period;
if (sinks.size() > 0) {
- publishMetrics(sampleMetrics());
+ publishMetrics(sampleMetrics(), false);
}
}
+
+ /**
+ * Requests an immediate publish of all metrics from sources to sinks.
+ */
+ @Override
+ public void publishMetricsNow() {
+ if (sinks.size() > 0) {
+ publishMetrics(sampleMetrics(), true);
+ }
+ }
/**
* Sample all the sources for a snapshot of metrics/tags
@@ -380,12 +390,20 @@ public class MetricsSystemImpl extends M
/**
* Publish a metrics snapshot to all the sinks
* @param buffer the metrics snapshot to publish
+ * @param immediate indicates that we should publish metrics immediately
+ * instead of using a separate thread.
*/
- synchronized void publishMetrics(MetricsBuffer buffer) {
+ synchronized void publishMetrics(MetricsBuffer buffer, boolean immediate) {
int dropped = 0;
for (MetricsSinkAdapter sa : sinks.values()) {
long startTime = Time.now();
- dropped += sa.putMetrics(buffer, logicalTime) ? 0 : 1;
+ boolean result;
+ if (immediate) {
+ result = sa.putMetricsImmediate(buffer);
+ } else {
+ result = sa.putMetrics(buffer, logicalTime);
+ }
+ dropped += result ? 0 : 1;
publishStat.add(Time.now() - startTime);
}
droppedPubAll.incr(dropped);
Modified: hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java Fri Dec 7 01:53:35 2012
@@ -28,6 +28,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.util.ReflectionUtils;
/** The class represents a cluster of computer with a tree hierarchical
* network topology.
@@ -52,6 +55,19 @@ public class NetworkTopology {
super(msg);
}
}
+
+ /**
+ * Get an instance of NetworkTopology based on the value of the configuration
+ * parameter net.topology.impl.
+ *
+ * @param conf the configuration to be used
+ * @return an instance of NetworkTopology
+ */
+ public static NetworkTopology getInstance(Configuration conf){
+ return ReflectionUtils.newInstance(
+ conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
+ NetworkTopology.class, NetworkTopology.class), conf);
+ }
/** InnerNode represents a switch/router of a data center or rack.
* Different from a leaf node, it has non-null children.
Propchange: hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/core/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/core:r1414747-1418159
Modified: hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemTestSetup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemTestSetup.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemTestSetup.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemTestSetup.java Fri Dec 7 01:53:35 2012
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystemTe
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.viewfs.ConfigUtil;
+import org.apache.hadoop.util.Shell;
import org.mortbay.log.Log;
@@ -123,8 +124,11 @@ public class ViewFileSystemTestSetup {
* in the target file system.
*/
static void linkUpFirstComponents(Configuration conf, String path, FileSystem fsTarget, String info) {
- int indexOf2ndSlash = path.indexOf('/', 1);
- String firstComponent = path.substring(0, indexOf2ndSlash);
+ int indexOfEnd = path.indexOf('/', 1);
+ if (Shell.WINDOWS) {
+ indexOfEnd = path.indexOf('/', indexOfEnd + 1);
+ }
+ String firstComponent = path.substring(0, indexOfEnd);
URI linkTarget = fsTarget.makeQualified(new Path(firstComponent)).toUri();
ConfigUtil.addLink(conf, firstComponent, linkTarget);
Log.info("Added link for " + info + " "
Modified: hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsTestSetup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsTestSetup.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsTestSetup.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsTestSetup.java Fri Dec 7 01:53:35 2012
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileContextT
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.viewfs.ConfigUtil;
+import org.apache.hadoop.util.Shell;
import org.mortbay.log.Log;
@@ -120,8 +121,11 @@ public class ViewFsTestSetup {
*/
static void linkUpFirstComponents(Configuration conf, String path,
FileContext fsTarget, String info) {
- int indexOf2ndSlash = path.indexOf('/', 1);
- String firstComponent = path.substring(0, indexOf2ndSlash);
+ int indexOfEnd = path.indexOf('/', 1);
+ if (Shell.WINDOWS) {
+ indexOfEnd = path.indexOf('/', indexOfEnd + 1);
+ }
+ String firstComponent = path.substring(0, indexOfEnd);
URI linkTarget = fsTarget.makeQualified(new Path(firstComponent)).toUri();
ConfigUtil.addLink(conf, firstComponent, linkTarget);
Log.info("Added link for " + info + " "
Modified: hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestUTF8.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestUTF8.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestUTF8.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestUTF8.java Fri Dec 7 01:53:35 2012
@@ -19,8 +19,12 @@
package org.apache.hadoop.io;
import junit.framework.TestCase;
+import java.io.IOException;
import java.util.Random;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.StringUtils;
+
/** Unit tests for UTF8. */
@SuppressWarnings("deprecation")
public class TestUTF8 extends TestCase {
@@ -92,5 +96,55 @@ public class TestUTF8 extends TestCase {
assertEquals(s, new String(dob.getData(), 2, dob.getLength()-2, "UTF-8"));
}
-
+
+ /**
+ * Test encoding and decoding of UTF8 outside the basic multilingual plane.
+ *
+ * This is a regression test for HADOOP-9103.
+ */
+ public void testNonBasicMultilingualPlane() throws Exception {
+ // Test using the "CAT FACE" character (U+1F431)
+ // See http://www.fileformat.info/info/unicode/char/1f431/index.htm
+ String catFace = "\uD83D\uDC31";
+
+ // This encodes to 4 bytes in UTF-8:
+ byte[] encoded = catFace.getBytes("UTF-8");
+ assertEquals(4, encoded.length);
+ assertEquals("f09f90b1", StringUtils.byteToHexString(encoded));
+
+ // Decode back to String using our own decoder
+ String roundTrip = UTF8.fromBytes(encoded);
+ assertEquals(catFace, roundTrip);
+ }
+
+ /**
+ * Test that decoding invalid UTF8 throws an appropriate error message.
+ */
+ public void testInvalidUTF8() throws Exception {
+ byte[] invalid = new byte[] {
+ 0x01, 0x02, (byte)0xff, (byte)0xff, 0x01, 0x02, 0x03, 0x04, 0x05 };
+ try {
+ UTF8.fromBytes(invalid);
+ fail("did not throw an exception");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains(
+ "Invalid UTF8 at ffff01020304", ioe);
+ }
+ }
+
+ /**
+ * Test for a 5-byte UTF8 sequence, which is now considered illegal.
+ */
+ public void test5ByteUtf8Sequence() throws Exception {
+ byte[] invalid = new byte[] {
+ 0x01, 0x02, (byte)0xf8, (byte)0x88, (byte)0x80,
+ (byte)0x80, (byte)0x80, 0x04, 0x05 };
+ try {
+ UTF8.fromBytes(invalid);
+ fail("did not throw an exception");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains(
+ "Invalid UTF8 at f88880808004", ioe);
+ }
+ }
}
Modified: hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java Fri Dec 7 01:53:35 2012
@@ -29,8 +29,6 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -115,31 +113,23 @@ public class TestGangliaMetrics {
final int expectedCountFromGanglia30 = expectedMetrics.length;
final int expectedCountFromGanglia31 = 2 * expectedMetrics.length;
- // use latch to make sure we received required records before shutting
- // down the MetricSystem
- CountDownLatch latch = new CountDownLatch(
- expectedCountFromGanglia30 + expectedCountFromGanglia31);
-
// Setup test for GangliaSink30
AbstractGangliaSink gsink30 = new GangliaSink30();
gsink30.init(cb.subset("test"));
- MockDatagramSocket mockds30 = new MockDatagramSocket(latch);
+ MockDatagramSocket mockds30 = new MockDatagramSocket();
GangliaMetricsTestHelper.setDatagramSocket(gsink30, mockds30);
// Setup test for GangliaSink31
AbstractGangliaSink gsink31 = new GangliaSink31();
gsink31.init(cb.subset("test"));
- MockDatagramSocket mockds31 = new MockDatagramSocket(latch);
+ MockDatagramSocket mockds31 = new MockDatagramSocket();
GangliaMetricsTestHelper.setDatagramSocket(gsink31, mockds31);
// register the sinks
ms.register("gsink30", "gsink30 desc", gsink30);
ms.register("gsink31", "gsink31 desc", gsink31);
- ms.onTimerEvent(); // trigger something interesting
+ ms.publishMetricsNow(); // publish the metrics
- // wait for all records and the stop MetricSystem. Without this
- // sometime the ms gets shutdown before all the sinks have consumed
- latch.await(200, TimeUnit.MILLISECONDS);
ms.stop();
// check GanfliaSink30 data
@@ -198,7 +188,6 @@ public class TestGangliaMetrics {
*/
private class MockDatagramSocket extends DatagramSocket {
private ArrayList<byte[]> capture;
- private CountDownLatch latch;
/**
* @throws SocketException
@@ -207,15 +196,6 @@ public class TestGangliaMetrics {
capture = new ArrayList<byte[]>();
}
- /**
- * @param latch
- * @throws SocketException
- */
- public MockDatagramSocket(CountDownLatch latch) throws SocketException {
- this();
- this.latch = latch;
- }
-
/* (non-Javadoc)
* @see java.net.DatagramSocket#send(java.net.DatagramPacket)
*/
@@ -225,9 +205,6 @@ public class TestGangliaMetrics {
byte[] bytes = new byte[p.getLength()];
System.arraycopy(p.getData(), p.getOffset(), bytes, 0, p.getLength());
capture.add(bytes);
-
- // decrement the latch
- latch.countDown();
}
/**
Modified: hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java?rev=1418161&r1=1418160&r2=1418161&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java Fri Dec 7 01:53:35 2012
@@ -18,7 +18,11 @@
package org.apache.hadoop.metrics2.impl;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import javax.annotation.Nullable;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -26,9 +30,11 @@ import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.runners.MockitoJUnitRunner;
+
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
+import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import org.apache.commons.configuration.SubsetConfiguration;
@@ -36,6 +42,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.MetricsException;
import static org.apache.hadoop.test.MoreAsserts.*;
+
+import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.hadoop.metrics2.MetricsSource;
@@ -47,6 +55,7 @@ import org.apache.hadoop.metrics2.lib.Me
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.util.StringUtils;
/**
* Test the MetricsSystemImpl class
@@ -72,7 +81,7 @@ public class TestMetricsSystemImpl {
}
@Test public void testInitFirst() throws Exception {
- ConfigBuilder cb = new ConfigBuilder().add("*.period", 8)
+ new ConfigBuilder().add("*.period", 8)
//.add("test.sink.plugin.urls", getPluginUrlsAsString())
.add("test.sink.test.class", TestSink.class.getName())
.add("test.*.source.filter.exclude", "s0")
@@ -93,8 +102,9 @@ public class TestMetricsSystemImpl {
MetricsSink sink2 = mock(MetricsSink.class);
ms.registerSink("sink1", "sink1 desc", sink1);
ms.registerSink("sink2", "sink2 desc", sink2);
- ms.onTimerEvent(); // trigger something interesting
+ ms.publishMetricsNow(); // publish the metrics
ms.stop();
+ ms.shutdown();
verify(sink1, times(2)).putMetrics(r1.capture());
List<MetricsRecord> mr1 = r1.getAllValues();
@@ -104,6 +114,177 @@ public class TestMetricsSystemImpl {
assertEquals("output", mr1, mr2);
}
+ @Test public void testMultiThreadedPublish() throws Exception {
+ new ConfigBuilder().add("*.period", 80)
+ .add("test.sink.Collector.queue.capacity", "20")
+ .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
+ final MetricsSystemImpl ms = new MetricsSystemImpl("Test");
+ ms.start();
+ final int numThreads = 10;
+ final CollectingSink sink = new CollectingSink(numThreads);
+ ms.registerSink("Collector",
+ "Collector of values from all threads.", sink);
+ final TestSource[] sources = new TestSource[numThreads];
+ final Thread[] threads = new Thread[numThreads];
+ final String[] results = new String[numThreads];
+ final CyclicBarrier barrier1 = new CyclicBarrier(numThreads),
+ barrier2 = new CyclicBarrier(numThreads);
+ for (int i = 0; i < numThreads; i++) {
+ sources[i] = ms.register("threadSource" + i,
+ "A source of my threaded goodness.",
+ new TestSource("threadSourceRec" + i));
+ threads[i] = new Thread(new Runnable() {
+ private boolean safeAwait(int mySource, CyclicBarrier barrier) {
+ try {
+ barrier1.await(2, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ results[mySource] = "Interrupted";
+ return false;
+ } catch (BrokenBarrierException e) {
+ results[mySource] = "Broken Barrier";
+ return false;
+ } catch (TimeoutException e) {
+ results[mySource] = "Timed out on barrier";
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void run() {
+ int mySource = Integer.parseInt(Thread.currentThread().getName());
+ if (sink.collected[mySource].get() != 0L) {
+ results[mySource] = "Someone else collected my metric!";
+ return;
+ }
+ // Wait for all the threads to come here so we can hammer
+ // the system at the same time
+ if (!safeAwait(mySource, barrier1)) return;
+ sources[mySource].g1.set(230);
+ ms.publishMetricsNow();
+ // Since some other thread may have snatched my metric,
+ // I need to wait for the threads to finish before checking.
+ if (!safeAwait(mySource, barrier2)) return;
+ if (sink.collected[mySource].get() != 230L) {
+ results[mySource] = "Metric not collected!";
+ return;
+ }
+ results[mySource] = "Passed";
+ }
+ }, "" + i);
+ }
+ for (Thread t : threads)
+ t.start();
+ for (Thread t : threads)
+ t.join();
+ assertEquals(0L, ms.droppedPubAll.value());
+ assertTrue(StringUtils.join("\n", Arrays.asList(results)),
+ Iterables.all(Arrays.asList(results), new Predicate<String>() {
+ @Override
+ public boolean apply(@Nullable String input) {
+ return input.equalsIgnoreCase("Passed");
+ }
+ }));
+ ms.stop();
+ ms.shutdown();
+ }
+
+ private static class CollectingSink implements MetricsSink {
+ private final AtomicLong[] collected;
+
+ public CollectingSink(int capacity) {
+ collected = new AtomicLong[capacity];
+ for (int i = 0; i < capacity; i++) {
+ collected[i] = new AtomicLong();
+ }
+ }
+
+ @Override
+ public void init(SubsetConfiguration conf) {
+ }
+
+ @Override
+ public void putMetrics(MetricsRecord record) {
+ final String prefix = "threadSourceRec";
+ if (record.name().startsWith(prefix)) {
+ final int recordNumber = Integer.parseInt(
+ record.name().substring(prefix.length()));
+ ArrayList<String> names = new ArrayList<String>();
+ for (AbstractMetric m : record.metrics()) {
+ if (m.name().equalsIgnoreCase("g1")) {
+ collected[recordNumber].set(m.value().longValue());
+ return;
+ }
+ names.add(m.name());
+ }
+ }
+ }
+
+ @Override
+ public void flush() {
+ }
+ }
+
+ @Test public void testHangingSink() {
+ new ConfigBuilder().add("*.period", 8)
+ .add("test.sink.test.class", TestSink.class.getName())
+ .add("test.sink.hanging.retry.delay", "1")
+ .add("test.sink.hanging.retry.backoff", "1.01")
+ .add("test.sink.hanging.retry.count", "0")
+ .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
+ MetricsSystemImpl ms = new MetricsSystemImpl("Test");
+ ms.start();
+ TestSource s = ms.register("s3", "s3 desc", new TestSource("s3rec"));
+ s.c1.incr();
+ HangingSink hanging = new HangingSink();
+ ms.registerSink("hanging", "Hang the sink!", hanging);
+ ms.publishMetricsNow();
+ assertEquals(1L, ms.droppedPubAll.value());
+ assertFalse(hanging.getInterrupted());
+ ms.stop();
+ ms.shutdown();
+ assertTrue(hanging.getInterrupted());
+ assertTrue("The sink didn't get called after its first hang " +
+ "for subsequent records.", hanging.getGotCalledSecondTime());
+ }
+
+ private static class HangingSink implements MetricsSink {
+ private volatile boolean interrupted;
+ private boolean gotCalledSecondTime;
+ private boolean firstTime = true;
+
+ public boolean getGotCalledSecondTime() {
+ return gotCalledSecondTime;
+ }
+
+ public boolean getInterrupted() {
+ return interrupted;
+ }
+
+ @Override
+ public void init(SubsetConfiguration conf) {
+ }
+
+ @Override
+ public void putMetrics(MetricsRecord record) {
+ // No need to hang every time, just the first record.
+ if (!firstTime) {
+ gotCalledSecondTime = true;
+ return;
+ }
+ firstTime = false;
+ try {
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException ex) {
+ interrupted = true;
+ }
+ }
+
+ @Override
+ public void flush() {
+ }
+ }
+
@Test public void testRegisterDups() {
MetricsSystem ms = new MetricsSystemImpl();
TestSource ts1 = new TestSource("ts1");
@@ -116,6 +297,7 @@ public class TestMetricsSystemImpl {
MetricsSource s2 = ms.getSource("ts1");
assertNotNull(s2);
assertNotSame(s1, s2);
+ ms.shutdown();
}
@Test(expected=MetricsException.class) public void testRegisterDupError() {