You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2012/10/19 04:27:38 UTC
svn commit: r1399950 [8/17] - in
/hadoop/common/branches/HDFS-2802/hadoop-common-project: hadoop-annotations/
hadoop-annotations/src/main/java/org/apache/hadoop/classification/tools/
hadoop-auth-examples/ hadoop-auth/ hadoop-auth/src/main/java/org/apac...
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java Fri Oct 19 02:25:55 2012
@@ -20,15 +20,11 @@ package org.apache.hadoop.io.compress;
import java.io.*;
import java.util.zip.GZIPOutputStream;
-import java.util.zip.GZIPInputStream;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.zlib.*;
-import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
-import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
/**
* This class creates gzip compressors/decompressors.
@@ -66,32 +62,39 @@ public class GzipCodec extends DefaultCo
super(out);
}
+ @Override
public void close() throws IOException {
out.close();
}
+ @Override
public void flush() throws IOException {
out.flush();
}
+ @Override
public void write(int b) throws IOException {
out.write(b);
}
+ @Override
public void write(byte[] data, int offset, int length)
throws IOException {
out.write(data, offset, length);
}
+ @Override
public void finish() throws IOException {
((ResetableGZIPOutputStream) out).finish();
}
+ @Override
public void resetState() throws IOException {
((ResetableGZIPOutputStream) out).resetState();
}
}
+ @Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
return (ZlibFactory.isNativeZlibLoaded(conf)) ?
@@ -100,6 +103,7 @@ public class GzipCodec extends DefaultCo
new GzipOutputStream(out);
}
+ @Override
public CompressionOutputStream createOutputStream(OutputStream out,
Compressor compressor)
throws IOException {
@@ -110,23 +114,27 @@ public class GzipCodec extends DefaultCo
createOutputStream(out);
}
+ @Override
public Compressor createCompressor() {
return (ZlibFactory.isNativeZlibLoaded(conf))
? new GzipZlibCompressor(conf)
: null;
}
+ @Override
public Class<? extends Compressor> getCompressorType() {
return ZlibFactory.isNativeZlibLoaded(conf)
? GzipZlibCompressor.class
: null;
}
+ @Override
public CompressionInputStream createInputStream(InputStream in)
throws IOException {
return createInputStream(in, null);
}
+ @Override
public CompressionInputStream createInputStream(InputStream in,
Decompressor decompressor)
throws IOException {
@@ -137,18 +145,21 @@ public class GzipCodec extends DefaultCo
conf.getInt("io.file.buffer.size", 4*1024));
}
+ @Override
public Decompressor createDecompressor() {
return (ZlibFactory.isNativeZlibLoaded(conf))
? new GzipZlibDecompressor()
: new BuiltInGzipDecompressor();
}
+ @Override
public Class<? extends Decompressor> getDecompressorType() {
return ZlibFactory.isNativeZlibLoaded(conf)
? GzipZlibDecompressor.class
: BuiltInGzipDecompressor.class;
}
+ @Override
public String getDefaultExtension() {
return ".gz";
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java Fri Oct 19 02:25:55 2012
@@ -24,7 +24,6 @@ import java.io.OutputStream;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.snappy.LoadSnappy;
import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -34,11 +33,6 @@ import org.apache.hadoop.util.NativeCode
* This class creates snappy compressors/decompressors.
*/
public class SnappyCodec implements Configurable, CompressionCodec {
-
- static {
- LoadSnappy.isLoaded();
- }
-
Configuration conf;
/**
@@ -63,11 +57,26 @@ public class SnappyCodec implements Conf
/**
* Are the native snappy libraries loaded & initialized?
- *
- * @return true if loaded & initialized, otherwise false
*/
+ public static void checkNativeCodeLoaded() {
+ if (!NativeCodeLoader.buildSupportsSnappy()) {
+ throw new RuntimeException("native snappy library not available: " +
+ "this version of libhadoop was built without " +
+ "snappy support.");
+ }
+ if (!SnappyCompressor.isNativeCodeLoaded()) {
+ throw new RuntimeException("native snappy library not available: " +
+ "SnappyCompressor has not been loaded.");
+ }
+ if (!SnappyDecompressor.isNativeCodeLoaded()) {
+ throw new RuntimeException("native snappy library not available: " +
+ "SnappyDecompressor has not been loaded.");
+ }
+ }
+
public static boolean isNativeCodeLoaded() {
- return LoadSnappy.isLoaded() && NativeCodeLoader.isNativeCodeLoaded();
+ return SnappyCompressor.isNativeCodeLoaded() &&
+ SnappyDecompressor.isNativeCodeLoaded();
}
/**
@@ -97,9 +106,7 @@ public class SnappyCodec implements Conf
public CompressionOutputStream createOutputStream(OutputStream out,
Compressor compressor)
throws IOException {
- if (!isNativeCodeLoaded()) {
- throw new RuntimeException("native snappy library not available");
- }
+ checkNativeCodeLoaded();
int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
@@ -117,10 +124,7 @@ public class SnappyCodec implements Conf
*/
@Override
public Class<? extends Compressor> getCompressorType() {
- if (!isNativeCodeLoaded()) {
- throw new RuntimeException("native snappy library not available");
- }
-
+ checkNativeCodeLoaded();
return SnappyCompressor.class;
}
@@ -131,9 +135,7 @@ public class SnappyCodec implements Conf
*/
@Override
public Compressor createCompressor() {
- if (!isNativeCodeLoaded()) {
- throw new RuntimeException("native snappy library not available");
- }
+ checkNativeCodeLoaded();
int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
@@ -167,10 +169,7 @@ public class SnappyCodec implements Conf
public CompressionInputStream createInputStream(InputStream in,
Decompressor decompressor)
throws IOException {
- if (!isNativeCodeLoaded()) {
- throw new RuntimeException("native snappy library not available");
- }
-
+ checkNativeCodeLoaded();
return new BlockDecompressorStream(in, decompressor, conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
@@ -183,10 +182,7 @@ public class SnappyCodec implements Conf
*/
@Override
public Class<? extends Decompressor> getDecompressorType() {
- if (!isNativeCodeLoaded()) {
- throw new RuntimeException("native snappy library not available");
- }
-
+ checkNativeCodeLoaded();
return SnappyDecompressor.class;
}
@@ -197,9 +193,7 @@ public class SnappyCodec implements Conf
*/
@Override
public Decompressor createDecompressor() {
- if (!isNativeCodeLoaded()) {
- throw new RuntimeException("native snappy library not available");
- }
+ checkNativeCodeLoaded();
int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java Fri Oct 19 02:25:55 2012
@@ -338,6 +338,7 @@ public class CBZip2InputStream extends I
}
+ @Override
public int read() throws IOException {
if (this.in != null) {
@@ -372,6 +373,7 @@ public class CBZip2InputStream extends I
*/
+ @Override
public int read(final byte[] dest, final int offs, final int len)
throws IOException {
if (offs < 0) {
@@ -574,6 +576,7 @@ public class CBZip2InputStream extends I
}
}
+ @Override
public void close() throws IOException {
InputStream inShadow = this.in;
if (inShadow != null) {
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java Fri Oct 19 02:25:55 2012
@@ -639,6 +639,7 @@ public class CBZip2OutputStream extends
init();
}
+ @Override
public void write(final int b) throws IOException {
if (this.out != null) {
write0(b);
@@ -704,6 +705,7 @@ public class CBZip2OutputStream extends
/**
* Overriden to close the stream.
*/
+ @Override
protected void finalize() throws Throwable {
finish();
super.finalize();
@@ -726,6 +728,7 @@ public class CBZip2OutputStream extends
}
}
+ @Override
public void close() throws IOException {
if (out != null) {
OutputStream outShadow = this.out;
@@ -739,6 +742,7 @@ public class CBZip2OutputStream extends
}
}
+ @Override
public void flush() throws IOException {
OutputStream outShadow = this.out;
if (outShadow != null) {
@@ -849,6 +853,7 @@ public class CBZip2OutputStream extends
return this.blockSize100k;
}
+ @Override
public void write(final byte[] buf, int offs, final int len)
throws IOException {
if (offs < 0) {
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.java Fri Oct 19 02:25:55 2012
@@ -258,6 +258,7 @@ public class Lz4Decompressor implements
return 0;
}
+ @Override
public synchronized void reset() {
finished = false;
compressedDirectBufLen = 0;
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java Fri Oct 19 02:25:55 2012
@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.util.NativeCodeLoader;
/**
* A {@link Compressor} based on the snappy compression algorithm.
@@ -51,22 +52,24 @@ public class SnappyCompressor implements
private long bytesRead = 0L;
private long bytesWritten = 0L;
-
+ private static boolean nativeSnappyLoaded = false;
+
static {
- if (LoadSnappy.isLoaded()) {
- // Initialize the native library
+ if (NativeCodeLoader.isNativeCodeLoaded() &&
+ NativeCodeLoader.buildSupportsSnappy()) {
try {
initIDs();
+ nativeSnappyLoaded = true;
} catch (Throwable t) {
- // Ignore failure to load/initialize snappy
- LOG.warn(t.toString());
+ LOG.error("failed to load SnappyCompressor", t);
}
- } else {
- LOG.error("Cannot load " + SnappyCompressor.class.getName() +
- " without snappy library!");
}
}
-
+
+ public static boolean isNativeCodeLoaded() {
+ return nativeSnappyLoaded;
+ }
+
/**
* Creates a new compressor.
*
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java Fri Oct 19 02:25:55 2012
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.util.NativeCodeLoader;
/**
* A {@link Decompressor} based on the snappy compression algorithm.
@@ -47,21 +48,24 @@ public class SnappyDecompressor implemen
private int userBufOff = 0, userBufLen = 0;
private boolean finished;
+ private static boolean nativeSnappyLoaded = false;
+
static {
- if (LoadSnappy.isLoaded()) {
- // Initialize the native library
+ if (NativeCodeLoader.isNativeCodeLoaded() &&
+ NativeCodeLoader.buildSupportsSnappy()) {
try {
initIDs();
+ nativeSnappyLoaded = true;
} catch (Throwable t) {
- // Ignore failure to load/initialize snappy
- LOG.warn(t.toString());
+ LOG.error("failed to load SnappyDecompressor", t);
}
- } else {
- LOG.error("Cannot load " + SnappyDecompressor.class.getName() +
- " without snappy library!");
}
}
-
+
+ public static boolean isNativeCodeLoaded() {
+ return nativeSnappyLoaded;
+ }
+
/**
* Creates a new compressor.
*
@@ -257,6 +261,7 @@ public class SnappyDecompressor implemen
return 0;
}
+ @Override
public synchronized void reset() {
finished = false;
compressedDirectBufLen = 0;
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java Fri Oct 19 02:25:55 2012
@@ -122,7 +122,7 @@ public class BuiltInGzipDecompressor imp
// in the first buffer load? (But how else would one do it?)
}
- /** {@inheritDoc} */
+ @Override
public synchronized boolean needsInput() {
if (state == GzipStateLabel.DEFLATE_STREAM) { // most common case
return inflater.needsInput();
@@ -144,6 +144,7 @@ public class BuiltInGzipDecompressor imp
* the bulk deflate stream, which is a performance hit we don't want
* to absorb. (Decompressor now documents this requirement.)
*/
+ @Override
public synchronized void setInput(byte[] b, int off, int len) {
if (b == null) {
throw new NullPointerException();
@@ -175,6 +176,7 @@ public class BuiltInGzipDecompressor imp
* methods below), the deflate stream is never copied; Inflater operates
* directly on the user's buffer.
*/
+ @Override
public synchronized int decompress(byte[] b, int off, int len)
throws IOException {
int numAvailBytes = 0;
@@ -385,7 +387,7 @@ public class BuiltInGzipDecompressor imp
copyBytesToLocal(n); // modifies userBufLen, etc.
if (localBufOff >= 4) { // should be strictly ==
long inputSize = readUIntLE(localBuf, 0);
- if (inputSize != (inflater.getBytesWritten() & 0xffffffff)) {
+ if (inputSize != (inflater.getBytesWritten() & 0xffffffffL)) {
throw new IOException(
"stored gzip size doesn't match decompressed size");
}
@@ -421,16 +423,17 @@ public class BuiltInGzipDecompressor imp
*
* @return the total (non-negative) number of unprocessed bytes in input
*/
+ @Override
public synchronized int getRemaining() {
return userBufLen;
}
- /** {@inheritDoc} */
+ @Override
public synchronized boolean needsDictionary() {
return inflater.needsDictionary();
}
- /** {@inheritDoc} */
+ @Override
public synchronized void setDictionary(byte[] b, int off, int len) {
inflater.setDictionary(b, off, len);
}
@@ -439,6 +442,7 @@ public class BuiltInGzipDecompressor imp
* Returns true if the end of the gzip substream (single "member") has been
* reached.</p>
*/
+ @Override
public synchronized boolean finished() {
return (state == GzipStateLabel.FINISHED);
}
@@ -447,6 +451,7 @@ public class BuiltInGzipDecompressor imp
* Resets everything, including the input buffer, regardless of whether the
* current gzip substream is finished.</p>
*/
+ @Override
public synchronized void reset() {
// could optionally emit INFO message if state != GzipStateLabel.FINISHED
inflater.reset();
@@ -463,7 +468,7 @@ public class BuiltInGzipDecompressor imp
hasHeaderCRC = false;
}
- /** {@inheritDoc} */
+ @Override
public synchronized void end() {
inflater.end();
}
@@ -566,7 +571,7 @@ public class BuiltInGzipDecompressor imp
return ((((long)(b[off+3] & 0xff) << 24) |
((long)(b[off+2] & 0xff) << 16) |
((long)(b[off+1] & 0xff) << 8) |
- ((long)(b[off] & 0xff) )) & 0xffffffff);
+ ((long)(b[off] & 0xff) )) & 0xffffffffL);
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java Fri Oct 19 02:25:55 2012
@@ -48,6 +48,7 @@ public class BuiltInZlibDeflater extends
super();
}
+ @Override
public synchronized int compress(byte[] b, int off, int len)
throws IOException {
return super.deflate(b, off, len);
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibInflater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibInflater.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibInflater.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibInflater.java Fri Oct 19 02:25:55 2012
@@ -39,6 +39,7 @@ public class BuiltInZlibInflater extends
super();
}
+ @Override
public synchronized int decompress(byte[] b, int off, int len)
throws IOException {
try {
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java Fri Oct 19 02:25:55 2012
@@ -259,6 +259,7 @@ public class ZlibCompressor implements C
}
}
+ @Override
public synchronized void setInput(byte[] b, int off, int len) {
if (b== null) {
throw new NullPointerException();
@@ -287,6 +288,7 @@ public class ZlibCompressor implements C
uncompressedDirectBufLen = uncompressedDirectBuf.position();
}
+ @Override
public synchronized void setDictionary(byte[] b, int off, int len) {
if (stream == 0 || b == null) {
throw new NullPointerException();
@@ -297,6 +299,7 @@ public class ZlibCompressor implements C
setDictionary(stream, b, off, len);
}
+ @Override
public synchronized boolean needsInput() {
// Consume remaining compressed data?
if (compressedDirectBuf.remaining() > 0) {
@@ -325,16 +328,19 @@ public class ZlibCompressor implements C
return false;
}
+ @Override
public synchronized void finish() {
finish = true;
}
+ @Override
public synchronized boolean finished() {
// Check if 'zlib' says its 'finished' and
// all compressed data has been consumed
return (finished && compressedDirectBuf.remaining() == 0);
}
+ @Override
public synchronized int compress(byte[] b, int off, int len)
throws IOException {
if (b == null) {
@@ -385,6 +391,7 @@ public class ZlibCompressor implements C
*
* @return the total (non-negative) number of compressed bytes output so far
*/
+ @Override
public synchronized long getBytesWritten() {
checkStream();
return getBytesWritten(stream);
@@ -395,11 +402,13 @@ public class ZlibCompressor implements C
*
* @return the total (non-negative) number of uncompressed bytes input so far
*/
+ @Override
public synchronized long getBytesRead() {
checkStream();
return getBytesRead(stream);
}
+ @Override
public synchronized void reset() {
checkStream();
reset(stream);
@@ -413,6 +422,7 @@ public class ZlibCompressor implements C
userBufOff = userBufLen = 0;
}
+ @Override
public synchronized void end() {
if (stream != 0) {
end(stream);
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java Fri Oct 19 02:25:55 2012
@@ -118,6 +118,7 @@ public class ZlibDecompressor implements
this(CompressionHeader.DEFAULT_HEADER, DEFAULT_DIRECT_BUFFER_SIZE);
}
+ @Override
public synchronized void setInput(byte[] b, int off, int len) {
if (b == null) {
throw new NullPointerException();
@@ -154,6 +155,7 @@ public class ZlibDecompressor implements
userBufLen -= compressedDirectBufLen;
}
+ @Override
public synchronized void setDictionary(byte[] b, int off, int len) {
if (stream == 0 || b == null) {
throw new NullPointerException();
@@ -165,6 +167,7 @@ public class ZlibDecompressor implements
needDict = false;
}
+ @Override
public synchronized boolean needsInput() {
// Consume remaining compressed data?
if (uncompressedDirectBuf.remaining() > 0) {
@@ -184,16 +187,19 @@ public class ZlibDecompressor implements
return false;
}
+ @Override
public synchronized boolean needsDictionary() {
return needDict;
}
+ @Override
public synchronized boolean finished() {
// Check if 'zlib' says it's 'finished' and
// all compressed data has been consumed
return (finished && uncompressedDirectBuf.remaining() == 0);
}
+ @Override
public synchronized int decompress(byte[] b, int off, int len)
throws IOException {
if (b == null) {
@@ -255,6 +261,7 @@ public class ZlibDecompressor implements
*
* @return the total (non-negative) number of unprocessed bytes in input
*/
+ @Override
public synchronized int getRemaining() {
checkStream();
return userBufLen + getRemaining(stream); // userBuf + compressedDirectBuf
@@ -263,6 +270,7 @@ public class ZlibDecompressor implements
/**
* Resets everything including the input buffers (user and direct).</p>
*/
+ @Override
public synchronized void reset() {
checkStream();
reset(stream);
@@ -274,6 +282,7 @@ public class ZlibDecompressor implements
userBufOff = userBufLen = 0;
}
+ @Override
public synchronized void end() {
if (stream != 0) {
end(stream);
@@ -281,6 +290,7 @@ public class ZlibDecompressor implements
}
}
+ @Override
protected void finalize() {
end();
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BCFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BCFile.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BCFile.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BCFile.java Fri Oct 19 02:25:55 2012
@@ -300,6 +300,7 @@ final class BCFile {
* Close the BCFile Writer. Attempting to use the Writer after calling
* <code>close</code> is not allowed and may lead to undetermined results.
*/
+ @Override
public void close() throws IOException {
if (closed == true) {
return;
@@ -447,6 +448,7 @@ final class BCFile {
this.compressAlgo = compressAlgo;
}
+ @Override
public void register(long raw, long begin, long end) {
metaIndex.addEntry(new MetaIndexEntry(name, compressAlgo,
new BlockRegion(begin, end - begin, raw)));
@@ -463,6 +465,7 @@ final class BCFile {
// do nothing
}
+ @Override
public void register(long raw, long begin, long end) {
dataIndex.addBlockRegion(new BlockRegion(begin, end - begin, raw));
}
@@ -671,6 +674,7 @@ final class BCFile {
/**
* Finishing reading the BCFile. Release all resources.
*/
+ @Override
public void close() {
// nothing to be done now
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/CompareUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/CompareUtils.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/CompareUtils.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/CompareUtils.java Fri Oct 19 02:25:55 2012
@@ -68,6 +68,7 @@ class CompareUtils {
magnitude = m;
}
+ @Override
public long magnitude() {
return magnitude;
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/TFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/TFile.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/TFile.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/TFile.java Fri Oct 19 02:25:55 2012
@@ -297,6 +297,7 @@ public class TFile {
*
* The underlying FSDataOutputStream is not closed.
*/
+ @Override
public void close() throws IOException {
if ((state == State.CLOSED)) {
return;
@@ -820,6 +821,7 @@ public class TFile {
* Close the reader. The state of the Reader object is undefined after
* close. Calling close() for multiple times has no effect.
*/
+ @Override
public void close() throws IOException {
readerBCF.close();
}
@@ -1573,6 +1575,7 @@ public class TFile {
* scanner after calling close is not defined. The entry returned by the
* previous entry() call will be invalid.
*/
+ @Override
public void close() throws IOException {
parkCursorAtEnd();
}
@@ -2102,7 +2105,7 @@ public class TFile {
}
public boolean isSorted() {
- return !strComparator.equals("");
+ return !strComparator.isEmpty();
}
public String getComparatorString() {
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/TFileDumper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/TFileDumper.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/TFileDumper.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/TFileDumper.java Fri Oct 19 02:25:55 2012
@@ -125,7 +125,7 @@ class TFileDumper {
dataSizeUncompressed += region.getRawSize();
}
properties.put("Data Block Bytes", Long.toString(dataSize));
- if (reader.readerBCF.getDefaultCompressionName() != "none") {
+ if (!reader.readerBCF.getDefaultCompressionName().equals("none")) {
properties.put("Data Block Uncompressed Bytes", Long
.toString(dataSizeUncompressed));
properties.put("Data Block Compression Ratio", String.format(
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java Fri Oct 19 02:25:55 2012
@@ -202,6 +202,7 @@ public class NativeIO {
this.mode = mode;
}
+ @Override
public String toString() {
return "Stat(owner='" + owner + "', group='" + group + "'" +
", mode=" + mode + ")";
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIOException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIOException.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIOException.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIOException.java Fri Oct 19 02:25:55 2012
@@ -38,6 +38,7 @@ public class NativeIOException extends I
return errno;
}
+ @Override
public String toString() {
return errno.toString() + ": " + super.getMessage();
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java Fri Oct 19 02:25:55 2012
@@ -150,6 +150,7 @@ public class RetryPolicies {
}
static class TryOnceThenFail implements RetryPolicy {
+ @Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
boolean isMethodIdempotent) throws Exception {
return RetryAction.FAIL;
@@ -157,6 +158,7 @@ public class RetryPolicies {
}
static class RetryForever implements RetryPolicy {
+ @Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
boolean isMethodIdempotent) throws Exception {
return RetryAction.RETRY;
@@ -430,6 +432,7 @@ public class RetryPolicies {
this.exceptionToPolicyMap = exceptionToPolicyMap;
}
+ @Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
boolean isMethodIdempotent) throws Exception {
RetryPolicy policy = exceptionToPolicyMap.get(e.getClass());
@@ -457,6 +460,7 @@ public class RetryPolicies {
}
}
+ @Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
boolean isMethodIdempotent) throws Exception {
RetryPolicy policy = null;
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/DeserializerComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/DeserializerComparator.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/DeserializerComparator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/DeserializerComparator.java Fri Oct 19 02:25:55 2012
@@ -56,6 +56,7 @@ public abstract class DeserializerCompar
this.deserializer.open(buffer);
}
+ @Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/JavaSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/JavaSerialization.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/JavaSerialization.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/JavaSerialization.java Fri Oct 19 02:25:55 2012
@@ -24,11 +24,8 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
-import java.util.Map;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.RawComparator;
/**
* <p>
@@ -45,6 +42,7 @@ public class JavaSerialization implement
private ObjectInputStream ois;
+ @Override
public void open(InputStream in) throws IOException {
ois = new ObjectInputStream(in) {
@Override protected void readStreamHeader() {
@@ -53,6 +51,7 @@ public class JavaSerialization implement
};
}
+ @Override
@SuppressWarnings("unchecked")
public T deserialize(T object) throws IOException {
try {
@@ -63,6 +62,7 @@ public class JavaSerialization implement
}
}
+ @Override
public void close() throws IOException {
ois.close();
}
@@ -74,6 +74,7 @@ public class JavaSerialization implement
private ObjectOutputStream oos;
+ @Override
public void open(OutputStream out) throws IOException {
oos = new ObjectOutputStream(out) {
@Override protected void writeStreamHeader() {
@@ -82,27 +83,32 @@ public class JavaSerialization implement
};
}
+ @Override
public void serialize(Serializable object) throws IOException {
oos.reset(); // clear (class) back-references
oos.writeObject(object);
}
+ @Override
public void close() throws IOException {
oos.close();
}
}
+ @Override
@InterfaceAudience.Private
public boolean accept(Class<?> c) {
return Serializable.class.isAssignableFrom(c);
}
+ @Override
@InterfaceAudience.Private
public Deserializer<Serializable> getDeserializer(Class<Serializable> c) {
return new JavaSerializationDeserializer<Serializable>();
}
+ @Override
@InterfaceAudience.Private
public Serializer<Serializable> getSerializer(Class<Serializable> c) {
return new JavaSerializationSerializer();
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/JavaSerializationComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/JavaSerializationComparator.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/JavaSerializationComparator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/JavaSerializationComparator.java Fri Oct 19 02:25:55 2012
@@ -44,6 +44,7 @@ public class JavaSerializationComparator
super(new JavaSerialization.JavaSerializationDeserializer<T>());
}
+ @Override
@InterfaceAudience.Private
public int compare(T o1, T o2) {
return o1.compareTo(o2);
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/SerializationFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/SerializationFactory.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/SerializationFactory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/SerializationFactory.java Fri Oct 19 02:25:55 2012
@@ -40,12 +40,12 @@ import org.apache.hadoop.util.Reflection
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public class SerializationFactory extends Configured {
-
- private static final Log LOG =
+
+ static final Log LOG =
LogFactory.getLog(SerializationFactory.class.getName());
private List<Serialization<?>> serializations = new ArrayList<Serialization<?>>();
-
+
/**
* <p>
* Serializations are found by reading the <code>io.serializations</code>
@@ -55,15 +55,21 @@ public class SerializationFactory extend
*/
public SerializationFactory(Configuration conf) {
super(conf);
- for (String serializerName : conf.getStrings(
- CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
- new String[]{WritableSerialization.class.getName(),
- AvroSpecificSerialization.class.getName(),
- AvroReflectSerialization.class.getName()})) {
- add(conf, serializerName);
+ if (conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).equals("")) {
+ LOG.warn("Serialization for various data types may not be available. Please configure "
+ + CommonConfigurationKeys.IO_SERIALIZATIONS_KEY
+ + " properly to have serialization support (it is currently not set).");
+ } else {
+ for (String serializerName : conf.getStrings(
+ CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, new String[] {
+ WritableSerialization.class.getName(),
+ AvroSpecificSerialization.class.getName(),
+ AvroReflectSerialization.class.getName() })) {
+ add(conf, serializerName);
+ }
}
}
-
+
@SuppressWarnings("unchecked")
private void add(Configuration conf, String serializationName) {
try {
@@ -101,5 +107,5 @@ public class SerializationFactory extend
}
return null;
}
-
+
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/WritableSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/WritableSerialization.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/WritableSerialization.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/WritableSerialization.java Fri Oct 19 02:25:55 2012
@@ -23,8 +23,6 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.Map;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java Fri Oct 19 02:25:55 2012
@@ -47,11 +47,13 @@ public abstract class AvroSerialization<
@InterfaceAudience.Private
public static final String AVRO_SCHEMA_KEY = "Avro-Schema";
+ @Override
@InterfaceAudience.Private
public Deserializer<T> getDeserializer(Class<T> c) {
return new AvroDeserializer(c);
}
+ @Override
@InterfaceAudience.Private
public Serializer<T> getSerializer(Class<T> c) {
return new AvroSerializer(c);
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Fri Oct 19 02:25:55 2012
@@ -75,6 +75,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -82,6 +83,8 @@ import org.apache.hadoop.util.Reflection
*
* @see Server
*/
+@InterfaceAudience.LimitedPrivate(value = { "Common", "HDFS", "MapReduce", "Yarn" })
+@InterfaceStability.Evolving
public class Client {
public static final Log LOG = LogFactory.getLog(Client.class);
@@ -222,7 +225,6 @@ public class Client {
private IpcConnectionContextProto connectionContext; // connection context
private final ConnectionId remoteId; // connection id
private AuthMethod authMethod; // authentication method
- private boolean useSasl;
private Token<? extends TokenIdentifier> token;
private SaslRpcClient saslRpcClient;
@@ -267,8 +269,7 @@ public class Client {
UserGroupInformation ticket = remoteId.getTicket();
Class<?> protocol = remoteId.getProtocol();
- this.useSasl = UserGroupInformation.isSecurityEnabled();
- if (useSasl && protocol != null) {
+ if (protocol != null) {
TokenInfo tokenInfo = SecurityUtil.getTokenInfo(protocol, conf);
if (tokenInfo != null) {
TokenSelector<? extends TokenIdentifier> tokenSelector = null;
@@ -293,12 +294,12 @@ public class Client {
}
}
- if (!useSasl) {
- authMethod = AuthMethod.SIMPLE;
- } else if (token != null) {
+ if (token != null) {
authMethod = AuthMethod.DIGEST;
- } else {
+ } else if (UserGroupInformation.isSecurityEnabled()) {
authMethod = AuthMethod.KERBEROS;
+ } else {
+ authMethod = AuthMethod.SIMPLE;
}
connectionContext = ProtoUtil.makeIpcConnectionContext(
@@ -316,7 +317,7 @@ public class Client {
/** Update lastActivity with the current time. */
private void touch() {
- lastActivity.set(System.currentTimeMillis());
+ lastActivity.set(Time.now());
}
/**
@@ -363,6 +364,7 @@ public class Client {
* until a byte is read.
* @throws IOException for any IO problem other than socket timeout
*/
+ @Override
public int read() throws IOException {
do {
try {
@@ -379,6 +381,7 @@ public class Client {
*
* @return the total number of bytes read; -1 if the connection is closed.
*/
+ @Override
public int read(byte[] buf, int off, int len) throws IOException {
do {
try {
@@ -509,6 +512,7 @@ public class Client {
final Random rand, final UserGroupInformation ugi) throws IOException,
InterruptedException {
ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
public Object run() throws IOException, InterruptedException {
final short MAX_BACKOFF = 5000;
closeConnection();
@@ -570,14 +574,12 @@ public class Client {
InputStream inStream = NetUtils.getInputStream(socket);
OutputStream outStream = NetUtils.getOutputStream(socket);
writeConnectionHeader(outStream);
- if (useSasl) {
+ if (authMethod != AuthMethod.SIMPLE) {
final InputStream in2 = inStream;
final OutputStream out2 = outStream;
UserGroupInformation ticket = remoteId.getTicket();
- if (authMethod == AuthMethod.KERBEROS) {
- if (ticket.getRealUser() != null) {
- ticket = ticket.getRealUser();
- }
+ if (ticket.getRealUser() != null) {
+ ticket = ticket.getRealUser();
}
boolean continueSasl = false;
try {
@@ -608,7 +610,6 @@ public class Client {
connectionContext.getProtocol(),
ProtoUtil.getUgi(connectionContext.getUserInfo()),
authMethod);
- useSasl = false;
}
}
@@ -762,7 +763,7 @@ public class Client {
private synchronized boolean waitForWork() {
if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
long timeout = maxIdleTime-
- (System.currentTimeMillis()-lastActivity.get());
+ (Time.now()-lastActivity.get());
if (timeout>0) {
try {
wait(timeout);
@@ -792,7 +793,7 @@ public class Client {
* since last I/O activity is equal to or greater than the ping interval
*/
private synchronized void sendPing() throws IOException {
- long curTime = System.currentTimeMillis();
+ long curTime = Time.now();
if ( curTime - lastActivity.get() >= pingInterval) {
lastActivity.set(curTime);
synchronized (out) {
@@ -802,6 +803,7 @@ public class Client {
}
}
+ @Override
public void run() {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": starting, having connections "
@@ -1167,7 +1169,7 @@ public class Client {
call.error);
}
} else {
- return call.rpcResponse;
+ return call.getRpcResult();
}
}
}
@@ -1398,5 +1400,10 @@ public class Client {
result = PRIME * result + ((ticket == null) ? 0 : ticket.hashCode());
return result;
}
+
+ @Override
+ public String toString() {
+ return serverPrincipal + "@" + address;
+ }
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java Fri Oct 19 02:25:55 2012
@@ -44,19 +44,21 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ProtoUtil;
+import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
+import com.google.protobuf.TextFormat;
/**
* RPC Engine for for protobuf based RPCs.
*/
@InterfaceStability.Evolving
public class ProtobufRpcEngine implements RpcEngine {
- private static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
+ public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
static { // Register the rpcRequest deserializer for WritableRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine(
@@ -185,21 +187,34 @@ public class ProtobufRpcEngine implement
throws ServiceException {
long startTime = 0;
if (LOG.isDebugEnabled()) {
- startTime = System.currentTimeMillis();
+ startTime = Time.now();
}
HadoopRpcRequestProto rpcRequest = constructRpcRequest(method, args);
RpcResponseWritable val = null;
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(Thread.currentThread().getId() + ": Call -> " +
+ remoteId + ": " + method.getName() +
+ " {" + TextFormat.shortDebugString((Message) args[1]) + "}");
+ }
try {
val = (RpcResponseWritable) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
new RpcRequestWritable(rpcRequest), remoteId);
+
} catch (Throwable e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(Thread.currentThread().getId() + ": Exception <- " +
+ remoteId + ": " + method.getName() +
+ " {" + e + "}");
+ }
+
throw new ServiceException(e);
}
if (LOG.isDebugEnabled()) {
- long callTime = System.currentTimeMillis() - startTime;
- LOG.debug("Call: " + method.getName() + " " + callTime);
+ long callTime = Time.now() - startTime;
+ LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
}
Message prototype = null;
@@ -212,12 +227,20 @@ public class ProtobufRpcEngine implement
try {
returnMessage = prototype.newBuilderForType()
.mergeFrom(val.responseMessage).build();
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(Thread.currentThread().getId() + ": Response <- " +
+ remoteId + ": " + method.getName() +
+ " {" + TextFormat.shortDebugString(returnMessage) + "}");
+ }
+
} catch (Throwable e) {
throw new ServiceException(e);
}
return returnMessage;
}
+ @Override
public void close() throws IOException {
if (!isClosed) {
isClosed = true;
@@ -446,10 +469,10 @@ public class ProtobufRpcEngine implement
.mergeFrom(rpcRequest.getRequest()).build();
Message result;
try {
- long startTime = System.currentTimeMillis();
+ long startTime = Time.now();
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
result = service.callBlockingMethod(methodDescriptor, null, param);
- int processingTime = (int) (System.currentTimeMillis() - startTime);
+ int processingTime = (int) (Time.now() - startTime);
int qTime = (int) (startTime - receiveTime);
if (LOG.isDebugEnabled()) {
LOG.info("Served: " + methodName + " queueTime= " + qTime +
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java Fri Oct 19 02:25:55 2012
@@ -19,7 +19,6 @@
package org.apache.hadoop.ipc;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.HashSet;
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java Fri Oct 19 02:25:55 2012
@@ -36,7 +36,8 @@ public class ProtocolSignature implement
WritableFactories.setFactory
(ProtocolSignature.class,
new WritableFactory() {
- public Writable newInstance() { return new ProtocolSignature(); }
+ @Override
+ public Writable newInstance() { return new ProtocolSignature(); }
});
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java Fri Oct 19 02:25:55 2012
@@ -48,8 +48,11 @@ import org.apache.hadoop.security.SaslRp
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
import com.google.protobuf.BlockingService;
@@ -71,6 +74,8 @@ import com.google.protobuf.BlockingServi
* All methods in the protocol should throw only IOException. No field data of
* the protocol instance is transmitted.
*/
+@InterfaceAudience.LimitedPrivate(value = { "Common", "HDFS", "MapReduce", "Yarn" })
+@InterfaceStability.Evolving
public class RPC {
public enum RpcKind {
RPC_BUILTIN ((short) 1), // Used for built in calls by tests
@@ -369,7 +374,7 @@ public class RPC {
int rpcTimeout,
RetryPolicy connectionRetryPolicy,
long timeout) throws IOException {
- long startTime = System.currentTimeMillis();
+ long startTime = Time.now();
IOException ioe;
while (true) {
try {
@@ -387,7 +392,7 @@ public class RPC {
ioe = nrthe;
}
// check if timed out
- if (System.currentTimeMillis()-timeout >= startTime) {
+ if (Time.now()-timeout >= startTime) {
throw ioe;
}
@@ -628,7 +633,7 @@ public class RPC {
/** Construct a server for a protocol implementation instance listening on a
* port and address.
- * @deprecated protocol interface should be passed.
+ * @deprecated Please use {@link Builder} to build the {@link Server}
*/
@Deprecated
public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf)
@@ -638,7 +643,7 @@ public class RPC {
/** Construct a server for a protocol implementation instance listening on a
* port and address.
- * @deprecated protocol interface should be passed.
+ * @deprecated Please use {@link Builder} to build the {@link Server}
*/
@Deprecated
public static Server getServer(final Object instance, final String bindAddress, final int port,
@@ -650,7 +655,10 @@ public class RPC {
null);
}
- /** Construct a server for a protocol implementation instance. */
+ /** Construct a server for a protocol implementation instance.
+ * @deprecated Please use {@link Builder} to build the {@link Server}
+ */
+ @Deprecated
public static Server getServer(Class<?> protocol,
Object instance, String bindAddress,
int port, Configuration conf)
@@ -660,7 +668,7 @@ public class RPC {
}
/** Construct a server for a protocol implementation instance.
- * @deprecated secretManager should be passed.
+ * @deprecated Please use {@link Builder} to build the {@link Server}
*/
@Deprecated
public static Server getServer(Class<?> protocol,
@@ -673,7 +681,10 @@ public class RPC {
conf, null, null);
}
- /** Construct a server for a protocol implementation instance. */
+ /** Construct a server for a protocol implementation instance.
+ * @deprecated Please use {@link Builder} to build the {@link Server}
+ */
+ @Deprecated
public static Server getServer(Class<?> protocol,
Object instance, String bindAddress, int port,
int numHandlers,
@@ -684,6 +695,10 @@ public class RPC {
conf, secretManager, null);
}
+ /**
+ * @deprecated Please use {@link Builder} to build the {@link Server}
+ */
+ @Deprecated
public static Server getServer(Class<?> protocol,
Object instance, String bindAddress, int port,
int numHandlers,
@@ -696,8 +711,10 @@ public class RPC {
verbose, conf, secretManager, portRangeConfig);
}
- /** Construct a server for a protocol implementation instance. */
-
+ /** Construct a server for a protocol implementation instance.
+ * @deprecated Please use {@link Builder} to build the {@link Server}
+ */
+ @Deprecated
public static <PROTO extends VersionedProtocol, IMPL extends PROTO>
Server getServer(Class<PROTO> protocol,
IMPL instance, String bindAddress, int port,
@@ -712,6 +729,110 @@ public class RPC {
null);
}
+ /**
+ * Class to construct instances of RPC server with specific options.
+ */
+ public static class Builder {
+ private Class<?> protocol = null;
+ private Object instance = null;
+ private String bindAddress = "0.0.0.0";
+ private int port = 0;
+ private int numHandlers = 1;
+ private int numReaders = -1;
+ private int queueSizePerHandler = -1;
+ private boolean verbose = false;
+ private final Configuration conf;
+ private SecretManager<? extends TokenIdentifier> secretManager = null;
+ private String portRangeConfig = null;
+
+ public Builder(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /** Mandatory field */
+ public Builder setProtocol(Class<?> protocol) {
+ this.protocol = protocol;
+ return this;
+ }
+
+ /** Mandatory field */
+ public Builder setInstance(Object instance) {
+ this.instance = instance;
+ return this;
+ }
+
+ /** Default: 0.0.0.0 */
+ public Builder setBindAddress(String bindAddress) {
+ this.bindAddress = bindAddress;
+ return this;
+ }
+
+ /** Default: 0 */
+ public Builder setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ /** Default: 1 */
+ public Builder setNumHandlers(int numHandlers) {
+ this.numHandlers = numHandlers;
+ return this;
+ }
+
+ /** Default: -1 */
+ public Builder setnumReaders(int numReaders) {
+ this.numReaders = numReaders;
+ return this;
+ }
+
+ /** Default: -1 */
+ public Builder setQueueSizePerHandler(int queueSizePerHandler) {
+ this.queueSizePerHandler = queueSizePerHandler;
+ return this;
+ }
+
+ /** Default: false */
+ public Builder setVerbose(boolean verbose) {
+ this.verbose = verbose;
+ return this;
+ }
+
+ /** Default: null */
+ public Builder setSecretManager(
+ SecretManager<? extends TokenIdentifier> secretManager) {
+ this.secretManager = secretManager;
+ return this;
+ }
+
+ /** Default: null */
+ public Builder setPortRangeConfig(String portRangeConfig) {
+ this.portRangeConfig = portRangeConfig;
+ return this;
+ }
+
+ /**
+ * Build the RPC Server.
+ * @throws IOException on error
+ * @throws HadoopIllegalArgumentException when mandatory fields are not set
+ */
+ public Server build() throws IOException, HadoopIllegalArgumentException {
+ if (this.conf == null) {
+ throw new HadoopIllegalArgumentException("conf is not set");
+ }
+ if (this.protocol == null) {
+ throw new HadoopIllegalArgumentException("protocol is not set");
+ }
+ if (this.instance == null) {
+ throw new HadoopIllegalArgumentException("instance is not set");
+ }
+
+ return getProtocolEngine(this.protocol, this.conf).getServer(
+ this.protocol, this.instance, this.bindAddress, this.port,
+ this.numHandlers, this.numReaders, this.queueSizePerHandler,
+ this.verbose, this.conf, this.secretManager, this.portRangeConfig);
+ }
+ }
+
/** An RPC Server. */
public abstract static class Server extends org.apache.hadoop.ipc.Server {
boolean verbose;
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Fri Oct 19 02:25:55 2012
@@ -46,11 +46,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
@@ -62,6 +64,7 @@ import javax.security.sasl.SaslServer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.IntegerRanges;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -84,7 +87,6 @@ import org.apache.hadoop.security.SaslRp
import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.security.SaslRpcServer.SaslStatus;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
@@ -95,6 +97,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
@@ -104,9 +107,47 @@ import com.google.common.annotations.Vis
*
* @see Client
*/
+@InterfaceAudience.LimitedPrivate(value = { "Common", "HDFS", "MapReduce", "Yarn" })
+@InterfaceStability.Evolving
public abstract class Server {
private final boolean authorize;
private boolean isSecurityEnabled;
+ private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
+
+ public void addTerseExceptions(Class<?>... exceptionClass) {
+ exceptionsHandler.addTerseExceptions(exceptionClass);
+ }
+
+ /**
+ * ExceptionsHandler manages Exception groups for special handling
+ * e.g., terse exception group for concise logging messages
+ */
+ static class ExceptionsHandler {
+ private volatile Set<String> terseExceptions = new HashSet<String>();
+
+ /**
+ * Add exception class so server won't log its stack trace.
+ * Modifying the terseException through this method is thread safe.
+ *
+ * @param exceptionClass exception classes
+ */
+ void addTerseExceptions(Class<?>... exceptionClass) {
+
+ // Make a copy of terseException for performing modification
+ final HashSet<String> newSet = new HashSet<String>(terseExceptions);
+
+ // Add all class names into the HashSet
+ for (Class<?> name : exceptionClass) {
+ newSet.add(name.toString());
+ }
+ // Replace terseException set
+ terseExceptions = Collections.unmodifiableSet(newSet);
+ }
+
+ boolean isTerse(Class<?> t) {
+ return terseExceptions.contains(t.toString());
+ }
+ }
/**
* The first four bytes of Hadoop RPC connections
@@ -411,7 +452,7 @@ public abstract class Server {
this.callId = id;
this.rpcRequest = param;
this.connection = connection;
- this.timestamp = System.currentTimeMillis();
+ this.timestamp = Time.now();
this.rpcResponse = null;
this.rpcKind = kind;
}
@@ -478,6 +519,7 @@ public abstract class Server {
this.readSelector = Selector.open();
}
+ @Override
public void run() {
LOG.info("Starting " + getName());
try {
@@ -561,7 +603,7 @@ public abstract class Server {
*/
private void cleanupConnections(boolean force) {
if (force || numConnections > thresholdIdleConnections) {
- long currentTime = System.currentTimeMillis();
+ long currentTime = Time.now();
if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
return;
}
@@ -597,7 +639,7 @@ public abstract class Server {
}
else i++;
}
- lastCleanupRunTime = System.currentTimeMillis();
+ lastCleanupRunTime = Time.now();
}
}
@@ -682,7 +724,7 @@ public abstract class Server {
try {
reader.startAdd();
SelectionKey readKey = reader.registerChannel(channel);
- c = new Connection(readKey, channel, System.currentTimeMillis());
+ c = new Connection(readKey, channel, Time.now());
readKey.attach(c);
synchronized (connectionList) {
connectionList.add(numConnections, c);
@@ -704,7 +746,7 @@ public abstract class Server {
if (c == null) {
return;
}
- c.setLastContact(System.currentTimeMillis());
+ c.setLastContact(Time.now());
try {
count = c.readAndProcess();
@@ -726,7 +768,7 @@ public abstract class Server {
c = null;
}
else {
- c.setLastContact(System.currentTimeMillis());
+ c.setLastContact(Time.now());
}
}
@@ -805,7 +847,7 @@ public abstract class Server {
LOG.info(getName() + ": doAsyncWrite threw exception " + e);
}
}
- long now = System.currentTimeMillis();
+ long now = Time.now();
if (now < lastPurgeTime + PURGE_INTERVAL) {
continue;
}
@@ -951,7 +993,7 @@ public abstract class Server {
if (inHandler) {
// set the serve time when the response has to be sent later
- call.timestamp = System.currentTimeMillis();
+ call.timestamp = Time.now();
incPending();
try {
@@ -1331,20 +1373,38 @@ public abstract class Server {
dataLengthBuffer.clear();
if (authMethod == null) {
throw new IOException("Unable to read authentication method");
- }
- if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
- AccessControlException ae = new AccessControlException("Authorization ("
- + CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION
- + ") is enabled but authentication ("
- + CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION
- + ") is configured as simple. Please configure another method "
- + "like kerberos or digest.");
- setupResponse(authFailedResponse, authFailedCall, RpcStatusProto.FATAL,
- null, ae.getClass().getName(), ae.getMessage());
- responder.doRespond(authFailedCall);
- throw ae;
- }
- if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
+ }
+ final boolean clientUsingSasl;
+ switch (authMethod) {
+ case SIMPLE: { // no sasl for simple
+ if (isSecurityEnabled) {
+ AccessControlException ae = new AccessControlException("Authorization ("
+ + CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION
+ + ") is enabled but authentication ("
+ + CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION
+ + ") is configured as simple. Please configure another method "
+ + "like kerberos or digest.");
+ setupResponse(authFailedResponse, authFailedCall, RpcStatusProto.FATAL,
+ null, ae.getClass().getName(), ae.getMessage());
+ responder.doRespond(authFailedCall);
+ throw ae;
+ }
+ clientUsingSasl = false;
+ useSasl = false;
+ break;
+ }
+ case DIGEST: {
+ clientUsingSasl = true;
+ useSasl = (secretManager != null);
+ break;
+ }
+ default: {
+ clientUsingSasl = true;
+ useSasl = isSecurityEnabled;
+ break;
+ }
+ }
+ if (clientUsingSasl && !useSasl) {
doSaslReply(SaslStatus.SUCCESS, new IntWritable(
SaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null);
authMethod = AuthMethod.SIMPLE;
@@ -1353,9 +1413,6 @@ public abstract class Server {
// to simple auth from now on.
skipInitialSaslHandshake = true;
}
- if (authMethod != AuthMethod.SIMPLE) {
- useSasl = true;
- }
connectionHeaderBuf = null;
connectionHeaderRead = true;
@@ -1489,8 +1546,6 @@ public abstract class Server {
UserGroupInformation realUser = user;
user = UserGroupInformation.createProxyUser(protocolUser
.getUserName(), realUser);
- // Now the user is a proxy user, set Authentication method Proxy.
- user.setAuthenticationMethod(AuthenticationMethod.PROXY);
}
}
}
@@ -1642,7 +1697,7 @@ public abstract class Server {
if (!channel.isOpen())
return;
try {socket.shutdownOutput();} catch(Exception e) {
- LOG.warn("Ignoring socket shutdown exception");
+ LOG.debug("Ignoring socket shutdown exception", e);
}
if (channel.isOpen()) {
try {channel.close();} catch(Exception e) {}
@@ -1703,8 +1758,8 @@ public abstract class Server {
// on the server side, as opposed to just a normal exceptional
// result.
LOG.warn(logMsg, e);
- } else if (e instanceof StandbyException) {
- // Don't log the whole stack trace of these exceptions.
+ } else if (exceptionsHandler.isTerse(e.getClass())) {
+ // Don't log the whole stack trace of these exceptions.
// Way too noisy!
LOG.info(logMsg);
} else {
@@ -1840,9 +1895,11 @@ public abstract class Server {
// Create the responder here
responder = new Responder();
- if (isSecurityEnabled) {
+ if (secretManager != null) {
SaslRpcServer.init(conf);
}
+
+ this.exceptionsHandler.addTerseExceptions(StandbyException.class);
}
private void closeConnection(Connection connection) {
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java Fri Oct 19 02:25:55 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.ipc.VersionedPr
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.*;
@@ -141,6 +142,7 @@ public class WritableRpcEngine implement
return rpcVersion;
}
+ @Override
@SuppressWarnings("deprecation")
public void readFields(DataInput in) throws IOException {
rpcVersion = in.readLong();
@@ -158,6 +160,7 @@ public class WritableRpcEngine implement
}
}
+ @Override
@SuppressWarnings("deprecation")
public void write(DataOutput out) throws IOException {
out.writeLong(rpcVersion);
@@ -172,6 +175,7 @@ public class WritableRpcEngine implement
}
}
+ @Override
public String toString() {
StringBuilder buffer = new StringBuilder();
buffer.append(methodName);
@@ -188,10 +192,12 @@ public class WritableRpcEngine implement
return buffer.toString();
}
+ @Override
public void setConf(Configuration conf) {
this.conf = conf;
}
+ @Override
public Configuration getConf() {
return this.conf;
}
@@ -214,23 +220,25 @@ public class WritableRpcEngine implement
this.client = CLIENTS.getClient(conf, factory);
}
+ @Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
long startTime = 0;
if (LOG.isDebugEnabled()) {
- startTime = System.currentTimeMillis();
+ startTime = Time.now();
}
ObjectWritable value = (ObjectWritable)
client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId);
if (LOG.isDebugEnabled()) {
- long callTime = System.currentTimeMillis() - startTime;
+ long callTime = Time.now() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
return value.get();
}
/* close the IPC client that's responsible for this invoker's RPCs */
+ @Override
synchronized public void close() {
if (!isClosed) {
isClosed = true;
@@ -464,7 +472,7 @@ public class WritableRpcEngine implement
// Invoke the protocol method
- long startTime = System.currentTimeMillis();
+ long startTime = Time.now();
Method method =
protocolImpl.protocolClass.getMethod(call.getMethodName(),
call.getParameterClasses());
@@ -472,7 +480,7 @@ public class WritableRpcEngine implement
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
Object value =
method.invoke(protocolImpl.protocolImpl, call.getParameters());
- int processingTime = (int) (System.currentTimeMillis() - startTime);
+ int processingTime = (int) (Time.now() - startTime);
int qTime = (int) (startTime-receivedTime);
if (LOG.isDebugEnabled()) {
LOG.debug("Served: " + call.getMethodName() +
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/jmx/JMXJsonServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/jmx/JMXJsonServlet.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/jmx/JMXJsonServlet.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/jmx/JMXJsonServlet.java Fri Oct 19 02:25:55 2012
@@ -113,12 +113,17 @@ import org.codehaus.jackson.JsonGenerato
* All other objects will be converted to a string and output as such.
*
* The bean's name and modelerType will be returned for all beans.
+ *
+ * Optional paramater "callback" should be used to deliver JSONP response.
+ *
*/
public class JMXJsonServlet extends HttpServlet {
private static final Log LOG = LogFactory.getLog(JMXJsonServlet.class);
private static final long serialVersionUID = 1L;
+ private static final String CALLBACK_PARAM = "callback";
+
/**
* MBean server.
*/
@@ -154,11 +159,22 @@ public class JMXJsonServlet extends Http
return;
}
JsonGenerator jg = null;
+ String jsonpcb = null;
+ PrintWriter writer = null;
try {
- response.setContentType("application/json; charset=utf8");
+ writer = response.getWriter();
+
+ // "callback" parameter implies JSONP outpout
+ jsonpcb = request.getParameter(CALLBACK_PARAM);
+ if (jsonpcb != null) {
+ response.setContentType("application/javascript; charset=utf8");
+ writer.write(jsonpcb + "(");
+ } else {
+ response.setContentType("application/json; charset=utf8");
+ }
- PrintWriter writer = response.getWriter();
jg = jsonFactory.createJsonGenerator(writer);
+ jg.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
jg.useDefaultPrettyPrinter();
jg.writeStartObject();
@@ -188,6 +204,12 @@ public class JMXJsonServlet extends Http
if (jg != null) {
jg.close();
}
+ if (jsonpcb != null) {
+ writer.write(");");
+ }
+ if (writer != null) {
+ writer.close();
+ }
}
} catch (IOException e) {
LOG.error("Caught an exception while processing JMX request", e);
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogLevel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogLevel.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogLevel.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogLevel.java Fri Oct 19 02:25:55 2012
@@ -88,6 +88,7 @@ public class LogLevel {
public static class Servlet extends HttpServlet {
private static final long serialVersionUID = 1L;
+ @Override
public void doGet(HttpServletRequest request, HttpServletResponse response
) throws ServletException, IOException {