You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/09/28 00:51:13 UTC
svn commit: r1527113 - in
/hadoop/common/trunk/hadoop-common-project/hadoop-common/src:
main/java/org/apache/hadoop/fs/ main/java/org/apache/hadoop/io/
main/java/org/apache/hadoop/util/ test/java/org/apache/hadoop/util/
Author: cnauroth
Date: Fri Sep 27 22:51:12 2013
New Revision: 1527113
URL: http://svn.apache.org/r1527113
Log:
HDFS-5260. Merge zero-copy memory-mapped HDFS client reads to trunk and branch-2. Contributed by Chris Nauroth.
Added:
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferUtil.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ReadOption.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyUnavailableException.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IdentityHashStore.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestIdentityHashStore.java
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferUtil.java?rev=1527113&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferUtil.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferUtil.java Fri Sep 27 22:51:12 2013
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.ByteBufferPool;
+
+import com.google.common.base.Preconditions;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class ByteBufferUtil {
+
+ /**
+ * Determine if a stream can do a byte buffer read via read(ByteBuffer buf)
+ */
+ private static boolean streamHasByteBufferRead(InputStream stream) {
+ if (!(stream instanceof ByteBufferReadable)) {
+ return false;
+ }
+ if (!(stream instanceof FSDataInputStream)) {
+ return true;
+ }
+ return ((FSDataInputStream)stream).getWrappedStream()
+ instanceof ByteBufferReadable;
+ }
+
+ /**
+ * Perform a fallback read.
+ */
+ public static ByteBuffer fallbackRead(
+ InputStream stream, ByteBufferPool bufferPool, int maxLength)
+ throws IOException {
+ if (bufferPool == null) {
+ throw new UnsupportedOperationException("zero-copy reads " +
+ "were not available, and you did not provide a fallback " +
+ "ByteBufferPool.");
+ }
+ boolean useDirect = streamHasByteBufferRead(stream);
+ ByteBuffer buffer = bufferPool.getBuffer(useDirect, maxLength);
+ if (buffer == null) {
+ throw new UnsupportedOperationException("zero-copy reads " +
+ "were not available, and the ByteBufferPool did not provide " +
+ "us with " + (useDirect ? "a direct" : "an indirect") +
+ "buffer.");
+ }
+ Preconditions.checkState(buffer.capacity() > 0);
+ Preconditions.checkState(buffer.isDirect() == useDirect);
+ maxLength = Math.min(maxLength, buffer.capacity());
+ boolean success = false;
+ try {
+ if (useDirect) {
+ buffer.clear();
+ buffer.limit(maxLength);
+ ByteBufferReadable readable = (ByteBufferReadable)stream;
+ int totalRead = 0;
+ while (true) {
+ if (totalRead >= maxLength) {
+ success = true;
+ break;
+ }
+ int nRead = readable.read(buffer);
+ if (nRead < 0) {
+ if (totalRead > 0) {
+ success = true;
+ }
+ break;
+ }
+ totalRead += nRead;
+ }
+ buffer.flip();
+ } else {
+ buffer.clear();
+ int nRead = stream.read(buffer.array(),
+ buffer.arrayOffset(), maxLength);
+ if (nRead >= 0) {
+ buffer.limit(nRead);
+ success = true;
+ }
+ }
+ } finally {
+ if (!success) {
+ // If we got an error while reading, or if we are at EOF, we
+ // don't need the buffer any more. We can give it back to the
+ // bufferPool.
+ bufferPool.putBuffer(buffer);
+ buffer = null;
+ }
+ }
+ return buffer;
+ }
+}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java?rev=1527113&r1=1527112&r2=1527113&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java Fri Sep 27 22:51:12 2013
@@ -1,4 +1,5 @@
/**
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,17 +20,29 @@ package org.apache.hadoop.fs;
import java.io.*;
import java.nio.ByteBuffer;
+import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.fs.ByteBufferUtil;
+import org.apache.hadoop.util.IdentityHashStore;
/** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
* and buffers input through a {@link BufferedInputStream}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FSDataInputStream extends DataInputStream
- implements Seekable, PositionedReadable, Closeable,
- ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead {
+ implements Seekable, PositionedReadable, Closeable,
+ ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
+ HasEnhancedByteBufferAccess {
+ /**
+ * Map ByteBuffers that we have handed out to readers to ByteBufferPool
+ * objects
+ */
+ private final IdentityHashStore<ByteBuffer, ByteBufferPool>
+ extendedReadBuffers
+ = new IdentityHashStore<ByteBuffer, ByteBufferPool>(0);
public FSDataInputStream(InputStream in)
throws IOException {
@@ -167,4 +180,45 @@ public class FSDataInputStream extends D
"support setting the drop-behind caching setting.");
}
}
+
+ @Override
+ public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
+ EnumSet<ReadOption> opts)
+ throws IOException, UnsupportedOperationException {
+ try {
+ return ((HasEnhancedByteBufferAccess)in).read(bufferPool,
+ maxLength, opts);
+ }
+ catch (ClassCastException e) {
+ ByteBuffer buffer = ByteBufferUtil.
+ fallbackRead(this, bufferPool, maxLength);
+ if (buffer != null) {
+ extendedReadBuffers.put(buffer, bufferPool);
+ }
+ return buffer;
+ }
+ }
+
+ private static final EnumSet<ReadOption> EMPTY_READ_OPTIONS_SET =
+ EnumSet.noneOf(ReadOption.class);
+
+ final public ByteBuffer read(ByteBufferPool bufferPool, int maxLength)
+ throws IOException, UnsupportedOperationException {
+ return read(bufferPool, maxLength, EMPTY_READ_OPTIONS_SET);
+ }
+
+ @Override
+ public void releaseBuffer(ByteBuffer buffer) {
+ try {
+ ((HasEnhancedByteBufferAccess)in).releaseBuffer(buffer);
+ }
+ catch (ClassCastException e) {
+ ByteBufferPool bufferPool = extendedReadBuffers.remove( buffer);
+ if (bufferPool == null) {
+ throw new IllegalArgumentException("tried to release a buffer " +
+ "that was not created by this stream.");
+ }
+ bufferPool.putBuffer(buffer);
+ }
+ }
}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java?rev=1527113&r1=1527112&r2=1527113&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java Fri Sep 27 22:51:12 2013
@@ -18,9 +18,11 @@
package org.apache.hadoop.fs;
import java.io.*;
+import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.ZeroCopyUnavailableException;
/****************************************************************
* FSInputStream is a generic old InputStream with a little bit
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java?rev=1527113&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java Fri Sep 27 22:51:12 2013
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.ByteBufferPool;
+
+/**
+ * FSDataInputStreams implement this interface to provide enhanced
+ * byte buffer access. Usually this takes the form of mmap support.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface HasEnhancedByteBufferAccess {
+ /**
+ * Get a ByteBuffer containing file data.
+ *
+ * This ByteBuffer may come from the stream itself, via a call like mmap,
+ * or it may come from the ByteBufferFactory which is passed in as an
+ * argument.
+ *
+ * @param factory
+ * If this is non-null, it will be used to create a fallback
+ * ByteBuffer when the stream itself cannot create one.
+ * @param maxLength
+ * The maximum length of buffer to return. We may return a buffer
+ * which is shorter than this.
+ * @param opts
+ * Options to use when reading.
+ *
+ * @return
+ * We will return null on EOF (and only on EOF).
+ * Otherwise, we will return a direct ByteBuffer containing at
+ * least one byte. You must free this ByteBuffer when you are
+ * done with it by calling releaseBuffer on it.
+ * The buffer will continue to be readable until it is released
+ * in this manner. However, the input stream's close method may
+ * warn about unclosed buffers.
+ * @throws
+ * IOException: if there was an error reading.
+ * UnsupportedOperationException: if factory was null, and we
+ * needed an external byte buffer. UnsupportedOperationException
+ * will never be thrown unless the factory argument is null.
+ */
+ public ByteBuffer read(ByteBufferPool factory, int maxLength,
+ EnumSet<ReadOption> opts)
+ throws IOException, UnsupportedOperationException;
+
+ /**
+ * Release a ByteBuffer which was created by the enhanced ByteBuffer read
+ * function. You must not continue using the ByteBuffer after calling this
+ * function.
+ *
+ * @param buffer
+ * The ByteBuffer to release.
+ */
+ public void releaseBuffer(ByteBuffer buffer);
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ReadOption.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ReadOption.java?rev=1527113&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ReadOption.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ReadOption.java Fri Sep 27 22:51:12 2013
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Options that can be used when reading from a FileSystem.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public enum ReadOption {
+ /**
+ * Skip checksums when reading. This option may be useful when reading a file
+ * format that has built-in checksums, or for testing purposes.
+ */
+ SKIP_CHECKSUMS,
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyUnavailableException.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyUnavailableException.java?rev=1527113&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyUnavailableException.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyUnavailableException.java Fri Sep 27 22:51:12 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+public class ZeroCopyUnavailableException extends IOException {
+ private static final long serialVersionUID = 0L;
+
+ public ZeroCopyUnavailableException(String message) {
+ super(message);
+ }
+
+ public ZeroCopyUnavailableException(String message, Exception e) {
+ super(message, e);
+ }
+
+ public ZeroCopyUnavailableException(Exception e) {
+ super(e);
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java?rev=1527113&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java Fri Sep 27 22:51:12 2013
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public interface ByteBufferPool {
+ /**
+ * Get a new direct ByteBuffer. The pool can provide this from
+ * removing a buffer from its internal cache, or by allocating a
+ * new buffer.
+ *
+ * @param direct Whether the buffer should be direct.
+ * @param length The minimum length the buffer will have.
+ * @return A new ByteBuffer. This ByteBuffer must be direct.
+ * Its capacity can be less than what was requested, but
+ * must be at least 1 byte.
+ */
+ ByteBuffer getBuffer(boolean direct, int length);
+
+ /**
+ * Release a buffer back to the pool.
+ * The pool may choose to put this buffer into its cache.
+ *
+ * @param buffer a direct bytebuffer
+ */
+ void putBuffer(ByteBuffer buffer);
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java?rev=1527113&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java Fri Sep 27 22:51:12 2013
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io;
+
+import com.google.common.collect.ComparisonChain;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This is a simple ByteBufferPool which just creates ByteBuffers as needed.
+ * It also caches ByteBuffers after they're released. It will always return
+ * the smallest cached buffer with at least the capacity you request.
+ * We don't try to do anything clever here like try to limit the maximum cache
+ * size.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public final class ElasticByteBufferPool implements ByteBufferPool {
+ private static final class Key implements Comparable<Key> {
+ private final int capacity;
+ private final long insertionTime;
+
+ Key(int capacity, long insertionTime) {
+ this.capacity = capacity;
+ this.insertionTime = insertionTime;
+ }
+
+ @Override
+ public int compareTo(Key other) {
+ return ComparisonChain.start().
+ compare(capacity, other.capacity).
+ compare(insertionTime, other.insertionTime).
+ result();
+ }
+
+ @Override
+ public boolean equals(Object rhs) {
+ if (rhs == null) {
+ return false;
+ }
+ try {
+ Key o = (Key)rhs;
+ return (compareTo(o) == 0);
+ } catch (ClassCastException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder().
+ append(capacity).
+ append(insertionTime).
+ toHashCode();
+ }
+ }
+
+ private final TreeMap<Key, ByteBuffer> buffers =
+ new TreeMap<Key, ByteBuffer>();
+
+ private final TreeMap<Key, ByteBuffer> directBuffers =
+ new TreeMap<Key, ByteBuffer>();
+
+ private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {
+ return direct ? directBuffers : buffers;
+ }
+
+ @Override
+ public synchronized ByteBuffer getBuffer(boolean direct, int length) {
+ TreeMap<Key, ByteBuffer> tree = getBufferTree(direct);
+ Map.Entry<Key, ByteBuffer> entry =
+ tree.ceilingEntry(new Key(length, 0));
+ if (entry == null) {
+ return direct ? ByteBuffer.allocateDirect(length) :
+ ByteBuffer.allocate(length);
+ }
+ tree.remove(entry.getKey());
+ return entry.getValue();
+ }
+
+ @Override
+ public synchronized void putBuffer(ByteBuffer buffer) {
+ TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect());
+ while (true) {
+ Key key = new Key(buffer.capacity(), System.nanoTime());
+ if (!tree.containsKey(key)) {
+ tree.put(key, buffer);
+ return;
+ }
+ // Buffers are indexed by (capacity, time).
+ // If our key is not unique on the first try, we try again, since the
+ // time will be different. Since we use nanoseconds, it's pretty
+ // unlikely that we'll loop even once, unless the system clock has a
+ // poor granularity.
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IdentityHashStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IdentityHashStore.java?rev=1527113&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IdentityHashStore.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IdentityHashStore.java Fri Sep 27 22:51:12 2013
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * The IdentityHashStore stores (key, value) mappings in an array.
+ * It is similar to java.util.HashTable, but much more lightweight.
+ * Neither inserting nor removing an element ever leads to any garbage
+ * getting created (assuming the array doesn't need to be enlarged).
+ *
+ * Unlike HashTable, it compares keys using
+ * {@link System#identityHashCode(Object)} and the identity operator.
+ * This is useful for types like ByteBuffer which have expensive hashCode
+ * and equals operators.
+ *
+ * We use linear probing to resolve collisions. This avoids the need for
+ * the overhead of linked list data structures. It also means that it is
+ * expensive to attempt to remove an element that isn't there, since we
+ * have to look at the entire array to be sure that it doesn't exist.
+ *
+ * @param <K> The key type to use.
+ * @param <V> THe value type to use.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+@SuppressWarnings("unchecked")
+public final class IdentityHashStore<K, V> {
+ /**
+ * Even elements are keys; odd elements are values.
+ * The array has size 1 + Math.pow(2, capacity).
+ */
+ private Object buffer[];
+
+ private int numInserted = 0;
+
+ private int capacity;
+
+ /**
+ * The default maxCapacity value to use.
+ */
+ private static final int DEFAULT_MAX_CAPACITY = 2;
+
+ public IdentityHashStore(int capacity) {
+ Preconditions.checkArgument(capacity >= 0);
+ if (capacity == 0) {
+ this.capacity = 0;
+ this.buffer = null;
+ } else {
+ // Round the capacity we need up to a power of 2.
+ realloc((int)Math.pow(2,
+ Math.ceil(Math.log(capacity) / Math.log(2))));
+ }
+ }
+
+ private void realloc(int newCapacity) {
+ Preconditions.checkArgument(newCapacity > 0);
+ Object prevBuffer[] = buffer;
+ this.capacity = newCapacity;
+ // Each element takes two array slots -- one for the key,
+ // and another for the value. We also want a load factor
+ // of 0.50. Combine those together and you get 4 * newCapacity.
+ this.buffer = new Object[4 * newCapacity];
+ this.numInserted = 0;
+ if (prevBuffer != null) {
+ for (int i = 0; i < prevBuffer.length; i += 2) {
+ if (prevBuffer[i] != null) {
+ putInternal(prevBuffer[i], prevBuffer[i + 1]);
+ }
+ }
+ }
+ }
+
+ private void putInternal(Object k, Object v) {
+ int hash = System.identityHashCode(k);
+ final int numEntries = buffer.length / 2;
+ int index = hash % numEntries;
+ while (true) {
+ if (buffer[2 * index] == null) {
+ buffer[2 * index] = k;
+ buffer[1 + (2 * index)] = v;
+ numInserted++;
+ return;
+ }
+ index = (index + 1) % numEntries;
+ }
+ }
+
+ /**
+ * Add a new (key, value) mapping.
+ *
+ * Inserting a new (key, value) never overwrites a previous one.
+ * In other words, you can insert the same key multiple times and it will
+ * lead to multiple entries.
+ */
+ public void put(K k, V v) {
+ Preconditions.checkNotNull(k);
+ if (buffer == null) {
+ realloc(DEFAULT_MAX_CAPACITY);
+ } else if (numInserted + 1 > capacity) {
+ realloc(capacity * 2);
+ }
+ putInternal(k, v);
+ }
+
+ private int getElementIndex(K k) {
+ if (buffer == null) {
+ return -1;
+ }
+ final int numEntries = buffer.length / 2;
+ int hash = System.identityHashCode(k);
+ int index = hash % numEntries;
+ int firstIndex = index;
+ do {
+ if (buffer[2 * index] == k) {
+ return index;
+ }
+ index = (index + 1) % numEntries;
+ } while (index != firstIndex);
+ return -1;
+ }
+
+ /**
+ * Retrieve a value associated with a given key.
+ */
+ public V get(K k) {
+ int index = getElementIndex(k);
+ if (index < 0) {
+ return null;
+ }
+ return (V)buffer[1 + (2 * index)];
+ }
+
+ /**
+ * Retrieve a value associated with a given key, and delete the
+ * relevant entry.
+ */
+ public V remove(K k) {
+ int index = getElementIndex(k);
+ if (index < 0) {
+ return null;
+ }
+ V val = (V)buffer[1 + (2 * index)];
+ buffer[2 * index] = null;
+ buffer[1 + (2 * index)] = null;
+ numInserted--;
+ return val;
+ }
+
+ public boolean isEmpty() {
+ return numInserted == 0;
+ }
+
+ public int numElements() {
+ return numInserted;
+ }
+
+ public int capacity() {
+ return capacity;
+ }
+
+ public interface Visitor<K, V> {
+ void accept(K k, V v);
+ }
+
+ /**
+ * Visit all key, value pairs in the IdentityHashStore.
+ */
+ public void visitAll(Visitor<K, V> visitor) {
+ int length = buffer == null ? 0 : buffer.length;
+ for (int i = 0; i < length; i += 2) {
+ if (buffer[i] != null) {
+ visitor.accept((K)buffer[i], (V)buffer[i + 1]);
+ }
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestIdentityHashStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestIdentityHashStore.java?rev=1527113&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestIdentityHashStore.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestIdentityHashStore.java Fri Sep 27 22:51:12 2013
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.IdentityHashStore;
+import org.apache.hadoop.util.IdentityHashStore.Visitor;
+import org.junit.Test;
+
+public class TestIdentityHashStore {
+ private static final Log LOG = LogFactory.getLog(TestIdentityHashStore.class.getName());
+
+ private static class Key {
+ private final String name;
+
+ Key(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public int hashCode() {
+ throw new RuntimeException("should not be used!");
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof Key)) {
+ return false;
+ }
+ Key other = (Key)o;
+ return name.equals(other.name);
+ }
+ }
+
+ @Test(timeout=60000)
+ public void testStartingWithZeroCapacity() {
+ IdentityHashStore<Key, Integer> store =
+ new IdentityHashStore<Key, Integer>(0);
+ store.visitAll(new Visitor<Key, Integer>() {
+ @Override
+ public void accept(Key k, Integer v) {
+ Assert.fail("found key " + k + " in empty IdentityHashStore.");
+ }
+ });
+ Assert.assertTrue(store.isEmpty());
+ final Key key1 = new Key("key1");
+ Integer value1 = new Integer(100);
+ store.put(key1, value1);
+ Assert.assertTrue(!store.isEmpty());
+ Assert.assertEquals(value1, store.get(key1));
+ store.visitAll(new Visitor<Key, Integer>() {
+ @Override
+ public void accept(Key k, Integer v) {
+ Assert.assertEquals(key1, k);
+ }
+ });
+ Assert.assertEquals(value1, store.remove(key1));
+ Assert.assertTrue(store.isEmpty());
+ }
+
+ @Test(timeout=60000)
+ public void testDuplicateInserts() {
+ IdentityHashStore<Key, Integer> store =
+ new IdentityHashStore<Key, Integer>(4);
+ store.visitAll(new Visitor<Key, Integer>() {
+ @Override
+ public void accept(Key k, Integer v) {
+ Assert.fail("found key " + k + " in empty IdentityHashStore.");
+ }
+ });
+ Assert.assertTrue(store.isEmpty());
+ Key key1 = new Key("key1");
+ Integer value1 = new Integer(100);
+ Integer value2 = new Integer(200);
+ Integer value3 = new Integer(300);
+ store.put(key1, value1);
+ Key equalToKey1 = new Key("key1");
+
+ // IdentityHashStore compares by object equality, not equals()
+ Assert.assertNull(store.get(equalToKey1));
+
+ Assert.assertTrue(!store.isEmpty());
+ Assert.assertEquals(value1, store.get(key1));
+ store.put(key1, value2);
+ store.put(key1, value3);
+ final List<Integer> allValues = new LinkedList<Integer>();
+ store.visitAll(new Visitor<Key, Integer>() {
+ @Override
+ public void accept(Key k, Integer v) {
+ allValues.add(v);
+ }
+ });
+ Assert.assertEquals(3, allValues.size());
+ for (int i = 0; i < 3; i++) {
+ Integer value = store.remove(key1);
+ Assert.assertTrue(allValues.remove(value));
+ }
+ Assert.assertNull(store.remove(key1));
+ Assert.assertTrue(store.isEmpty());
+ }
+
+ @Test(timeout=60000)
+ public void testAdditionsAndRemovals() {
+ IdentityHashStore<Key, Integer> store =
+ new IdentityHashStore<Key, Integer>(0);
+ final int NUM_KEYS = 1000;
+ LOG.debug("generating " + NUM_KEYS + " keys");
+ final List<Key> keys = new ArrayList<Key>(NUM_KEYS);
+ for (int i = 0; i < NUM_KEYS; i++) {
+ keys.add(new Key("key " + i));
+ }
+ for (int i = 0; i < NUM_KEYS; i++) {
+ store.put(keys.get(i), i);
+ }
+ store.visitAll(new Visitor<Key, Integer>() {
+ @Override
+ public void accept(Key k, Integer v) {
+ Assert.assertTrue(keys.contains(k));
+ }
+ });
+ for (int i = 0; i < NUM_KEYS; i++) {
+ Assert.assertEquals(Integer.valueOf(i),
+ store.remove(keys.get(i)));
+ }
+ store.visitAll(new Visitor<Key, Integer>() {
+ @Override
+ public void accept(Key k, Integer v) {
+ Assert.fail("expected all entries to be removed");
+ }
+ });
+ Assert.assertTrue("expected the store to be " +
+ "empty, but found " + store.numElements() + " elements.",
+ store.isEmpty());
+ Assert.assertEquals(1024, store.capacity());
+ }
+
+}