You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:38 UTC
[31/50] [abbrv] Rename tez-engine-api to tez-runtime-api and
tez-engine is split into 2: - tez-engine-library for user-visible
Input/Output/Processor implementations - tez-engine-internals for framework
internals
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
new file mode 100644
index 0000000..4ce82d5
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -0,0 +1,559 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.BufferUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.tez.common.counters.TezCounter;
+
+/**
+ * <code>IFile</code> is the simple <key-len, value-len, key, value> format
+ * for the intermediate map-outputs in Map-Reduce.
+ *
+ * There is a <code>Writer</code> to write out map-outputs in this format and
+ * a <code>Reader</code> to read files of this format.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class IFile {
+ private static final Log LOG = LogFactory.getLog(IFile.class);
+ public static final int EOF_MARKER = -1; // End of File Marker
+ public static final int RLE_MARKER = -2; // Repeat same key marker
+ public static final DataInputBuffer REPEAT_KEY = new DataInputBuffer();
+
+ /**
+ * <code>IFile.Writer</code> to write out intermediate map-outputs.
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public static class Writer {
+ FSDataOutputStream out;
+ boolean ownOutputStream = false;
+ long start = 0;
+ FSDataOutputStream rawOut;
+ AtomicBoolean closed = new AtomicBoolean(false);
+
+ CompressionOutputStream compressedOut;
+ Compressor compressor;
+ boolean compressOutput = false;
+
+ long decompressedBytesWritten = 0;
+ long compressedBytesWritten = 0;
+
+ // Count records written to disk
+ private long numRecordsWritten = 0;
+ private final TezCounter writtenRecordsCounter;
+
+ IFileOutputStream checksumOut;
+
+ Class keyClass;
+ Class valueClass;
+ Serializer keySerializer;
+ Serializer valueSerializer;
+
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ DataOutputBuffer previous = new DataOutputBuffer();
+
+ // de-dup keys or not
+ private boolean rle = false;
+
+ public Writer(Configuration conf, FileSystem fs, Path file,
+ Class keyClass, Class valueClass,
+ CompressionCodec codec,
+ TezCounter writesCounter) throws IOException {
+ this(conf, fs.create(file), keyClass, valueClass, codec,
+ writesCounter);
+ ownOutputStream = true;
+ }
+
+ protected Writer(TezCounter writesCounter) {
+ writtenRecordsCounter = writesCounter;
+ }
+
+ public Writer(Configuration conf, FSDataOutputStream out,
+ Class keyClass, Class valueClass,
+ CompressionCodec codec, TezCounter writesCounter)
+ throws IOException {
+ this.writtenRecordsCounter = writesCounter;
+ this.checksumOut = new IFileOutputStream(out);
+ this.rawOut = out;
+ this.start = this.rawOut.getPos();
+ if (codec != null) {
+ this.compressor = CodecPool.getCompressor(codec);
+ if (this.compressor != null) {
+ this.compressor.reset();
+ this.compressedOut = codec.createOutputStream(checksumOut, compressor);
+ this.out = new FSDataOutputStream(this.compressedOut, null);
+ this.compressOutput = true;
+ } else {
+ LOG.warn("Could not obtain compressor from CodecPool");
+ this.out = new FSDataOutputStream(checksumOut,null);
+ }
+ } else {
+ this.out = new FSDataOutputStream(checksumOut,null);
+ }
+
+ this.keyClass = keyClass;
+ this.valueClass = valueClass;
+
+ if (keyClass != null) {
+ SerializationFactory serializationFactory =
+ new SerializationFactory(conf);
+ this.keySerializer = serializationFactory.getSerializer(keyClass);
+ this.keySerializer.open(buffer);
+ this.valueSerializer = serializationFactory.getSerializer(valueClass);
+ this.valueSerializer.open(buffer);
+ }
+ }
+
+ public Writer(Configuration conf, FileSystem fs, Path file)
+ throws IOException {
+ this(conf, fs, file, null, null, null, null);
+ }
+
+ public void close() throws IOException {
+ if (closed.getAndSet(true)) {
+ throw new IOException("Writer was already closed earlier");
+ }
+
+ // When IFile writer is created by BackupStore, we do not have
+ // Key and Value classes set. So, check before closing the
+ // serializers
+ if (keyClass != null) {
+ keySerializer.close();
+ valueSerializer.close();
+ }
+
+ // Write EOF_MARKER for key/value length
+ WritableUtils.writeVInt(out, EOF_MARKER);
+ WritableUtils.writeVInt(out, EOF_MARKER);
+ decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
+
+ //Flush the stream
+ out.flush();
+
+ if (compressOutput) {
+ // Flush
+ compressedOut.finish();
+ compressedOut.resetState();
+ }
+
+ // Close the underlying stream iff we own it...
+ if (ownOutputStream) {
+ out.close();
+ }
+ else {
+ // Write the checksum
+ checksumOut.finish();
+ }
+
+ compressedBytesWritten = rawOut.getPos() - start;
+
+ if (compressOutput) {
+ // Return back the compressor
+ CodecPool.returnCompressor(compressor);
+ compressor = null;
+ }
+
+ out = null;
+ if(writtenRecordsCounter != null) {
+ writtenRecordsCounter.increment(numRecordsWritten);
+ }
+ }
+
+ public void append(Object key, Object value) throws IOException {
+ if (key.getClass() != keyClass)
+ throw new IOException("wrong key class: "+ key.getClass()
+ +" is not "+ keyClass);
+ if (value.getClass() != valueClass)
+ throw new IOException("wrong value class: "+ value.getClass()
+ +" is not "+ valueClass);
+
+ boolean sameKey = false;
+
+ // Append the 'key'
+ keySerializer.serialize(key);
+ int keyLength = buffer.getLength();
+ if (keyLength < 0) {
+ throw new IOException("Negative key-length not allowed: " + keyLength +
+ " for " + key);
+ }
+
+ if(keyLength == previous.getLength()) {
+ sameKey = (BufferUtils.compare(previous, buffer) == 0);
+ }
+
+ if(!sameKey) {
+ BufferUtils.copy(buffer, previous);
+ }
+
+ // Append the 'value'
+ valueSerializer.serialize(value);
+ int valueLength = buffer.getLength() - keyLength;
+ if (valueLength < 0) {
+ throw new IOException("Negative value-length not allowed: " +
+ valueLength + " for " + value);
+ }
+
+ if(sameKey) {
+ WritableUtils.writeVInt(out, RLE_MARKER); // Same key as previous
+ WritableUtils.writeVInt(out, valueLength); // value length
+ out.write(buffer.getData(), keyLength, buffer.getLength()); // only the value
+ // Update bytes written
+ decompressedBytesWritten += 0 + valueLength +
+ WritableUtils.getVIntSize(RLE_MARKER) +
+ WritableUtils.getVIntSize(valueLength);
+ } else {
+ // Write the record out
+ WritableUtils.writeVInt(out, keyLength); // key length
+ WritableUtils.writeVInt(out, valueLength); // value length
+ out.write(buffer.getData(), 0, buffer.getLength()); // data
+ // Update bytes written
+ decompressedBytesWritten += keyLength + valueLength +
+ WritableUtils.getVIntSize(keyLength) +
+ WritableUtils.getVIntSize(valueLength);
+ }
+
+ // Reset
+ buffer.reset();
+
+
+ ++numRecordsWritten;
+ }
+
+ public void append(DataInputBuffer key, DataInputBuffer value)
+ throws IOException {
+ int keyLength = key.getLength() - key.getPosition();
+ if (keyLength < 0) {
+ throw new IOException("Negative key-length not allowed: " + keyLength +
+ " for " + key);
+ }
+
+ int valueLength = value.getLength() - value.getPosition();
+ if (valueLength < 0) {
+ throw new IOException("Negative value-length not allowed: " +
+ valueLength + " for " + value);
+ }
+
+ boolean sameKey = false;
+
+ if(rle && keyLength == previous.getLength()) {
+ sameKey = (keyLength != 0) && (BufferUtils.compare(previous, key) == 0);
+ }
+
+ if(rle && sameKey) {
+ WritableUtils.writeVInt(out, RLE_MARKER);
+ WritableUtils.writeVInt(out, valueLength);
+ out.write(value.getData(), value.getPosition(), valueLength);
+
+ // Update bytes written
+ decompressedBytesWritten += 0 + valueLength
+ + WritableUtils.getVIntSize(RLE_MARKER)
+ + WritableUtils.getVIntSize(valueLength);
+ } else {
+ WritableUtils.writeVInt(out, keyLength);
+ WritableUtils.writeVInt(out, valueLength);
+ out.write(key.getData(), key.getPosition(), keyLength);
+ out.write(value.getData(), value.getPosition(), valueLength);
+
+ // Update bytes written
+ decompressedBytesWritten += keyLength + valueLength
+ + WritableUtils.getVIntSize(keyLength)
+ + WritableUtils.getVIntSize(valueLength);
+
+ BufferUtils.copy(key, previous);
+ }
+ ++numRecordsWritten;
+ }
+
+ // Required for mark/reset
+ public DataOutputStream getOutputStream () {
+ return out;
+ }
+
+ // Required for mark/reset
+ public void updateCountersForExternalAppend(long length) {
+ ++numRecordsWritten;
+ decompressedBytesWritten += length;
+ }
+
+ public long getRawLength() {
+ return decompressedBytesWritten;
+ }
+
+ public long getCompressedLength() {
+ return compressedBytesWritten;
+ }
+
+ public void setRLE(boolean rle) {
+ this.rle = rle;
+ previous.reset();
+ }
+
+ }
+
+ /**
+ * <code>IFile.Reader</code> to read intermediate map-outputs.
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public static class Reader {
+
+ public enum KeyState {NO_KEY, NEW_KEY, SAME_KEY};
+
+ private static final int DEFAULT_BUFFER_SIZE = 128*1024;
+
+ // Count records read from disk
+ private long numRecordsRead = 0;
+ private final TezCounter readRecordsCounter;
+
+ final InputStream in; // Possibly decompressed stream that we read
+ Decompressor decompressor;
+ public long bytesRead = 0;
+ protected final long fileLength;
+ protected boolean eof = false;
+ final IFileInputStream checksumIn;
+
+ protected byte[] buffer = null;
+ protected int bufferSize = DEFAULT_BUFFER_SIZE;
+ protected DataInputStream dataIn;
+
+ protected int recNo = 1;
+ protected int prevKeyLength;
+ protected int currentKeyLength;
+ protected int currentValueLength;
+ byte keyBytes[] = new byte[0];
+
+
+ /**
+ * Construct an IFile Reader.
+ *
+ * @param conf Configuration File
+ * @param fs FileSystem
+ * @param file Path of the file to be opened. This file should have
+ * checksum bytes for the data at the end of the file.
+ * @param codec codec
+ * @param readsCounter Counter for records read from disk
+ * @throws IOException
+ */
+ public Reader(Configuration conf, FileSystem fs, Path file,
+ CompressionCodec codec,
+ TezCounter readsCounter) throws IOException {
+ this(conf, fs.open(file),
+ fs.getFileStatus(file).getLen(),
+ codec, readsCounter);
+ }
+
+ /**
+ * Construct an IFile Reader.
+ *
+ * @param conf Configuration File
+ * @param in The input stream
+ * @param length Length of the data in the stream, including the checksum
+ * bytes.
+ * @param codec codec
+ * @param readsCounter Counter for records read from disk
+ * @throws IOException
+ */
+ public Reader(Configuration conf, InputStream in, long length,
+ CompressionCodec codec,
+ TezCounter readsCounter) throws IOException {
+ readRecordsCounter = readsCounter;
+ checksumIn = new IFileInputStream(in,length, conf);
+ if (codec != null) {
+ decompressor = CodecPool.getDecompressor(codec);
+ if (decompressor != null) {
+ this.in = codec.createInputStream(checksumIn, decompressor);
+ } else {
+ LOG.warn("Could not obtain decompressor from CodecPool");
+ this.in = checksumIn;
+ }
+ } else {
+ this.in = checksumIn;
+ }
+ this.dataIn = new DataInputStream(this.in);
+ this.fileLength = length;
+
+ if (conf != null) {
+ bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+ }
+ }
+
+ public long getLength() {
+ return fileLength - checksumIn.getSize();
+ }
+
+ public long getPosition() throws IOException {
+ return checksumIn.getPosition();
+ }
+
+ /**
+ * Read upto len bytes into buf starting at offset off.
+ *
+ * @param buf buffer
+ * @param off offset
+ * @param len length of buffer
+ * @return the no. of bytes read
+ * @throws IOException
+ */
+ private int readData(byte[] buf, int off, int len) throws IOException {
+ int bytesRead = 0;
+ while (bytesRead < len) {
+ int n = IOUtils.wrappedReadForCompressedData(in, buf, off + bytesRead,
+ len - bytesRead);
+ if (n < 0) {
+ return bytesRead;
+ }
+ bytesRead += n;
+ }
+ return len;
+ }
+
+ protected boolean positionToNextRecord(DataInput dIn) throws IOException {
+ // Sanity check
+ if (eof) {
+ throw new EOFException("Completed reading " + bytesRead);
+ }
+
+ // Read key and value lengths
+ prevKeyLength = currentKeyLength;
+ currentKeyLength = WritableUtils.readVInt(dIn);
+ currentValueLength = WritableUtils.readVInt(dIn);
+ bytesRead += WritableUtils.getVIntSize(currentKeyLength) +
+ WritableUtils.getVIntSize(currentValueLength);
+
+ // Check for EOF
+ if (currentKeyLength == EOF_MARKER && currentValueLength == EOF_MARKER) {
+ eof = true;
+ return false;
+ }
+
+ // Sanity check
+ if (currentKeyLength != RLE_MARKER && currentKeyLength < 0) {
+ throw new IOException("Rec# " + recNo + ": Negative key-length: " +
+ currentKeyLength);
+ }
+ if (currentValueLength < 0) {
+ throw new IOException("Rec# " + recNo + ": Negative value-length: " +
+ currentValueLength);
+ }
+
+ return true;
+ }
+
+ public boolean nextRawKey(DataInputBuffer key) throws IOException {
+ return readRawKey(key) != KeyState.NO_KEY;
+ }
+
+ public KeyState readRawKey(DataInputBuffer key) throws IOException {
+ if (!positionToNextRecord(dataIn)) {
+ return KeyState.NO_KEY;
+ }
+ if(currentKeyLength == RLE_MARKER) {
+ currentKeyLength = prevKeyLength;
+ // no data to read
+ key.reset(keyBytes, currentKeyLength);
+ return KeyState.SAME_KEY;
+ }
+ if (keyBytes.length < currentKeyLength) {
+ keyBytes = new byte[currentKeyLength << 1];
+ }
+ int i = readData(keyBytes, 0, currentKeyLength);
+ if (i != currentKeyLength) {
+ throw new IOException ("Asked for " + currentKeyLength + " Got: " + i);
+ }
+ key.reset(keyBytes, currentKeyLength);
+ bytesRead += currentKeyLength;
+ return KeyState.NEW_KEY;
+ }
+
+ public void nextRawValue(DataInputBuffer value) throws IOException {
+ final byte[] valBytes =
+ ((value.getData().length < currentValueLength) || (value.getData() == keyBytes))
+ ? new byte[currentValueLength << 1]
+ : value.getData();
+ int i = readData(valBytes, 0, currentValueLength);
+ if (i != currentValueLength) {
+ throw new IOException ("Asked for " + currentValueLength + " Got: " + i);
+ }
+ value.reset(valBytes, currentValueLength);
+
+ // Record the bytes read
+ bytesRead += currentValueLength;
+
+ ++recNo;
+ ++numRecordsRead;
+ }
+
+ public void close() throws IOException {
+ // Close the underlying stream
+ in.close();
+
+ // Release the buffer
+ dataIn = null;
+ buffer = null;
+ if(readRecordsCounter != null) {
+ readRecordsCounter.increment(numRecordsRead);
+ }
+
+ // Return the decompressor
+ if (decompressor != null) {
+ decompressor.reset();
+ CodecPool.returnDecompressor(decompressor);
+ decompressor = null;
+ }
+ }
+
+ public void reset(int offset) {
+ return;
+ }
+
+ public void disableChecksumValidation() {
+ checksumIn.disableChecksumValidation();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
new file mode 100644
index 0000000..e828c0b
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import java.io.EOFException;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.HasFileDescriptor;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.tez.common.TezJobConfig;
+/**
+ * A checksum input stream, used for IFiles.
+ * Used to validate the checksum of files created by {@link IFileOutputStream}.
+*/
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class IFileInputStream extends InputStream {
+
+ private final InputStream in; //The input stream to be verified for checksum.
+ private final FileDescriptor inFd; // the file descriptor, if it is known
+ private final long length; //The total length of the input file
+ private final long dataLength;
+ private DataChecksum sum;
+ private long currentOffset = 0;
+ private final byte b[] = new byte[1];
+ private byte csum[] = null;
+ private int checksumSize;
+ private byte[] buffer;
+ private int offset;
+
+ private ReadaheadRequest curReadahead = null;
+ private ReadaheadPool raPool = ReadaheadPool.getInstance();
+ private boolean readahead;
+ private int readaheadLength;
+
+ public static final Log LOG = LogFactory.getLog(IFileInputStream.class);
+
+ private boolean disableChecksumValidation = false;
+
+ /**
+ * Create a checksum input stream that reads
+ * @param in The input stream to be verified for checksum.
+ * @param len The length of the input stream including checksum bytes.
+ */
+ public IFileInputStream(InputStream in, long len, Configuration conf) {
+ this.in = in;
+ this.inFd = getFileDescriptorIfAvail(in);
+ sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
+ Integer.MAX_VALUE);
+ checksumSize = sum.getChecksumSize();
+ buffer = new byte[4096];
+ offset = 0;
+ length = len;
+ dataLength = length - checksumSize;
+
+ conf = (conf != null) ? conf : new Configuration();
+ readahead = conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_IFILE_READAHEAD);
+ readaheadLength = conf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
+
+ doReadahead();
+ }
+
+ private static FileDescriptor getFileDescriptorIfAvail(InputStream in) {
+ FileDescriptor fd = null;
+ try {
+ if (in instanceof HasFileDescriptor) {
+ fd = ((HasFileDescriptor)in).getFileDescriptor();
+ } else if (in instanceof FileInputStream) {
+ fd = ((FileInputStream)in).getFD();
+ }
+ } catch (IOException e) {
+ LOG.info("Unable to determine FileDescriptor", e);
+ }
+ return fd;
+ }
+
+ /**
+ * Close the input stream. Note that we need to read to the end of the
+ * stream to validate the checksum.
+ */
+ @Override
+ public void close() throws IOException {
+
+ if (curReadahead != null) {
+ curReadahead.cancel();
+ }
+ if (currentOffset < dataLength) {
+ byte[] t = new byte[Math.min((int)
+ (Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)];
+ while (currentOffset < dataLength) {
+ int n = read(t, 0, t.length);
+ if (0 == n) {
+ throw new EOFException("Could not validate checksum");
+ }
+ }
+ }
+ in.close();
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ throw new IOException("Skip not supported for IFileInputStream");
+ }
+
+ public long getPosition() {
+ return (currentOffset >= dataLength) ? dataLength : currentOffset;
+ }
+
+ public long getSize() {
+ return checksumSize;
+ }
+
+ private void checksum(byte[] b, int off, int len) {
+ if(len >= buffer.length) {
+ sum.update(buffer, 0, offset);
+ offset = 0;
+ sum.update(b, off, len);
+ return;
+ }
+ final int remaining = buffer.length - offset;
+ if(len > remaining) {
+ sum.update(buffer, 0, offset);
+ offset = 0;
+ }
+ /* now we should have len < buffer.length */
+ System.arraycopy(b, off, buffer, offset, len);
+ offset += len;
+ }
+
+ /**
+ * Read bytes from the stream.
+ * At EOF, checksum is validated, but the checksum
+ * bytes are not passed back in the buffer.
+ */
+ public int read(byte[] b, int off, int len) throws IOException {
+
+ if (currentOffset >= dataLength) {
+ return -1;
+ }
+
+ doReadahead();
+
+ return doRead(b,off,len);
+ }
+
+ private void doReadahead() {
+ if (raPool != null && inFd != null && readahead) {
+ curReadahead = raPool.readaheadStream(
+ "ifile", inFd,
+ currentOffset, readaheadLength, dataLength,
+ curReadahead);
+ }
+ }
+
+ /**
+ * Read bytes from the stream.
+ * At EOF, checksum is validated and sent back
+ * as the last four bytes of the buffer. The caller should handle
+ * these bytes appropriately
+ */
+ public int readWithChecksum(byte[] b, int off, int len) throws IOException {
+
+ if (currentOffset == length) {
+ return -1;
+ }
+ else if (currentOffset >= dataLength) {
+ // If the previous read drained off all the data, then just return
+ // the checksum now. Note that checksum validation would have
+ // happened in the earlier read
+ int lenToCopy = (int) (checksumSize - (currentOffset - dataLength));
+ if (len < lenToCopy) {
+ lenToCopy = len;
+ }
+ System.arraycopy(csum, (int) (currentOffset - dataLength), b, off,
+ lenToCopy);
+ currentOffset += lenToCopy;
+ return lenToCopy;
+ }
+
+ int bytesRead = doRead(b,off,len);
+
+ if (currentOffset == dataLength) {
+ if (len >= bytesRead + checksumSize) {
+ System.arraycopy(csum, 0, b, off + bytesRead, checksumSize);
+ bytesRead += checksumSize;
+ currentOffset += checksumSize;
+ }
+ }
+ return bytesRead;
+ }
+
+ private int doRead(byte[]b, int off, int len) throws IOException {
+
+ // If we are trying to read past the end of data, just read
+ // the left over data
+ if (currentOffset + len > dataLength) {
+ len = (int) dataLength - (int)currentOffset;
+ }
+
+ int bytesRead = in.read(b, off, len);
+
+ if (bytesRead < 0) {
+ throw new ChecksumException("Checksum Error", 0);
+ }
+
+ checksum(b, off, bytesRead);
+
+ currentOffset += bytesRead;
+
+ if (disableChecksumValidation) {
+ return bytesRead;
+ }
+
+ if (currentOffset == dataLength) {
+ // The last four bytes are checksum. Strip them and verify
+ sum.update(buffer, 0, offset);
+ csum = new byte[checksumSize];
+ IOUtils.readFully(in, csum, 0, checksumSize);
+ if (!sum.compare(csum, 0)) {
+ throw new ChecksumException("Checksum Error", 0);
+ }
+ }
+ return bytesRead;
+ }
+
+
+ @Override
+ public int read() throws IOException {
+ b[0] = 0;
+ int l = read(b,0,1);
+ if (l < 0) return l;
+
+ // Upgrade the b[0] to an int so as not to misinterpret the
+ // first bit of the byte as a sign bit
+ int result = 0xFF & b[0];
+ return result;
+ }
+
+ public byte[] getChecksum() {
+ return csum;
+ }
+
+ void disableChecksumValidation() {
+ disableChecksumValidation = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileOutputStream.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileOutputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileOutputStream.java
new file mode 100644
index 0000000..3198446
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileOutputStream.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.DataChecksum;
+/**
+ * A Checksum output stream.
+ * Checksum for the contents of the file is calculated and
+ * appended to the end of the file on close of the stream.
+ * Used for IFiles
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class IFileOutputStream extends FilterOutputStream {
+
+ /**
+ * The output stream to be checksummed.
+ */
+ private final DataChecksum sum;
+ private byte[] barray;
+ private byte[] buffer;
+ private int offset;
+ private boolean closed = false;
+ private boolean finished = false;
+
+ /**
+ * Create a checksum output stream that writes
+ * the bytes to the given stream.
+ * @param out
+ */
+ public IFileOutputStream(OutputStream out) {
+ super(out);
+ sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
+ Integer.MAX_VALUE);
+ barray = new byte[sum.getChecksumSize()];
+ buffer = new byte[4096];
+ offset = 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ finish();
+ out.close();
+ }
+
+ /**
+ * Finishes writing data to the output stream, by writing
+ * the checksum bytes to the end. The underlying stream is not closed.
+ * @throws IOException
+ */
+ public void finish() throws IOException {
+ if (finished) {
+ return;
+ }
+ finished = true;
+ sum.update(buffer, 0, offset);
+ sum.writeValue(barray, 0, false);
+ out.write (barray, 0, sum.getChecksumSize());
+ out.flush();
+ }
+
+ private void checksum(byte[] b, int off, int len) {
+ if(len >= buffer.length) {
+ sum.update(buffer, 0, offset);
+ offset = 0;
+ sum.update(b, off, len);
+ return;
+ }
+ final int remaining = buffer.length - offset;
+ if(len > remaining) {
+ sum.update(buffer, 0, offset);
+ offset = 0;
+ }
+ /*
+ // FIXME if needed re-enable this in debug mode
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("XXX checksum" +
+ " b=" + b + " off=" + off +
+ " buffer=" + " offset=" + offset +
+ " len=" + len);
+ }
+ */
+ /* now we should have len < buffer.length */
+ System.arraycopy(b, off, buffer, offset, len);
+ offset += len;
+ }
+
+ /**
+ * Write bytes to the stream.
+ */
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ checksum(b, off, len);
+ out.write(b,off,len);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ barray[0] = (byte) (b & 0xFF);
+ write(barray,0,1);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
new file mode 100644
index 0000000..1b153ca
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -0,0 +1,932 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.HashComparator;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.IndexedSorter;
+import org.apache.hadoop.util.Progress;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class PipelinedSorter extends ExternalSorter {
+
+ private static final Log LOG = LogFactory.getLog(PipelinedSorter.class);
+
+ /**
+ * The size of each record in the index file for the map-outputs.
+ */
+ public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
+
+ private final static int APPROX_HEADER_LENGTH = 150;
+
+ int partitionBits;
+
+ private static final int PARTITION = 0; // partition offset in acct
+ private static final int KEYSTART = 1; // key offset in acct
+ private static final int VALSTART = 2; // val offset in acct
+ private static final int VALLEN = 3; // val len in acct
+ private static final int NMETA = 4; // num meta ints
+ private static final int METASIZE = NMETA * 4; // size in bytes
+
+ // spill accounting
+ volatile Throwable sortSpillException = null;
+
+ int numSpills = 0;
+ int minSpillsForCombine;
+ private HashComparator hasher;
+ // SortSpans
+ private SortSpan span;
+ private ByteBuffer largeBuffer;
+ // Merger
+ private SpanMerger merger;
+ private ExecutorService sortmaster;
+
+ final ArrayList<TezSpillRecord> indexCacheList =
+ new ArrayList<TezSpillRecord>();
+ private int totalIndexCacheMemory;
+ private int indexCacheMemoryLimit;
+
+ public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
+ super.initialize(outputContext, conf, numOutputs);
+
+ partitionBits = bitcount(partitions)+1;
+
+ //sanity checks
+ final float spillper =
+ this.conf.getFloat(
+ TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_SORT_SPILL_PERCENT);
+ final int sortmb =
+ this.conf.getInt(
+ TezJobConfig.TEZ_RUNTIME_IO_SORT_MB,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_IO_SORT_MB);
+ indexCacheMemoryLimit = this.conf.getInt(TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
+ if (spillper > (float)1.0 || spillper <= (float)0.0) {
+ throw new IOException("Invalid \"" + TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT +
+ "\": " + spillper);
+ }
+ if ((sortmb & 0x7FF) != sortmb) {
+ throw new IOException(
+ "Invalid \"" + TezJobConfig.TEZ_RUNTIME_IO_SORT_MB + "\": " + sortmb);
+ }
+
+ // buffers and accounting
+ int maxMemUsage = sortmb << 20;
+ maxMemUsage -= maxMemUsage % METASIZE;
+ largeBuffer = ByteBuffer.allocate(maxMemUsage);
+ LOG.info(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB + " = " + sortmb);
+ // TODO: configurable setting?
+ span = new SortSpan(largeBuffer, 1024*1024, 16);
+ merger = new SpanMerger(comparator);
+ final int sortThreads =
+ this.conf.getInt(
+ TezJobConfig.TEZ_RUNTIME_SORT_THREADS,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_SORT_THREADS);
+ sortmaster = Executors.newFixedThreadPool(sortThreads);
+
+ // k/v serialization
+ if(comparator instanceof HashComparator) {
+ hasher = (HashComparator)comparator;
+ LOG.info("Using the HashComparator");
+ } else {
+ hasher = null;
+ }
+ valSerializer.open(span.out);
+ keySerializer.open(span.out);
+ minSpillsForCombine = this.conf.getInt(TezJobConfig.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
+ }
+
+ private int bitcount(int n) {
+ int bit = 0;
+ while(n!=0) {
+ bit++;
+ n >>= 1;
+ }
+ return bit;
+ }
+
+ public void sort() throws IOException {
+ SortSpan newSpan = span.next();
+
+ if(newSpan == null) {
+ // sort in the same thread, do not wait for the thread pool
+ merger.add(span.sort(sorter, comparator));
+ spill();
+ int items = 1024*1024;
+ int perItem = 16;
+ if(span.length() != 0) {
+ items = span.length();
+ perItem = span.kvbuffer.limit()/items;
+ items = (largeBuffer.capacity())/(METASIZE+perItem);
+ if(items > 1024*1024) {
+ // our goal is to have 1M splits and sort early
+ items = 1024*1024;
+ }
+ }
+ span = new SortSpan(largeBuffer, items, perItem);
+ } else {
+ // queue up the sort
+ SortTask task = new SortTask(span, sorter, comparator);
+ Future<SpanIterator> future = sortmaster.submit(task);
+ merger.add(future);
+ span = newSpan;
+ }
+ valSerializer.open(span.out);
+ keySerializer.open(span.out);
+ }
+
+ @Override
+ public void write(Object key, Object value)
+ throws IOException {
+ collect(
+ key, value, partitioner.getPartition(key, value, partitions));
+ }
+
+ /**
+ * Serialize the key, value to intermediate storage.
+ * When this method returns, kvindex must refer to sufficient unused
+ * storage to store one METADATA.
+ */
+ synchronized void collect(Object key, Object value, final int partition
+ ) throws IOException {
+ if (key.getClass() != keyClass) {
+ throw new IOException("Type mismatch in key from map: expected "
+ + keyClass.getName() + ", received "
+ + key.getClass().getName());
+ }
+ if (value.getClass() != valClass) {
+ throw new IOException("Type mismatch in value from map: expected "
+ + valClass.getName() + ", received "
+ + value.getClass().getName());
+ }
+ if (partition < 0 || partition >= partitions) {
+ throw new IOException("Illegal partition for " + key + " (" +
+ partition + ")");
+ }
+ if(span.kvmeta.remaining() < METASIZE) {
+ this.sort();
+ }
+ int keystart = span.kvbuffer.position();
+ int valstart = -1;
+ int valend = -1;
+ try {
+ keySerializer.serialize(key);
+ valstart = span.kvbuffer.position();
+ valSerializer.serialize(value);
+ valend = span.kvbuffer.position();
+ } catch(BufferOverflowException overflow) {
+ // restore limit
+ span.kvbuffer.position(keystart);
+ this.sort();
+ // try again
+ this.collect(key, value, partition);
+ return;
+ }
+
+ int prefix = 0;
+
+ if(hasher != null) {
+ prefix = hasher.getHashCode(key);
+ }
+
+ prefix = (partition << (32 - partitionBits)) | (prefix >>> partitionBits);
+
+ /* maintain order as in PARTITION, KEYSTART, VALSTART, VALLEN */
+ span.kvmeta.put(prefix);
+ span.kvmeta.put(keystart);
+ span.kvmeta.put(valstart);
+ span.kvmeta.put(valend - valstart);
+ if((valstart - keystart) > span.keymax) {
+ span.keymax = (valstart - keystart);
+ }
+ if((valend - valstart) > span.valmax) {
+ span.valmax = (valend - valstart);
+ }
+ mapOutputRecordCounter.increment(1);
+ mapOutputByteCounter.increment(valend - keystart);
+ }
+
+ public void spill() throws IOException {
+ // create spill file
+ final long size = largeBuffer.capacity() +
+ (partitions * APPROX_HEADER_LENGTH);
+ final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+ final Path filename =
+ mapOutputFile.getSpillFileForWrite(numSpills, size);
+ FSDataOutputStream out = rfs.create(filename, true, 4096);
+
+ try {
+ merger.ready(); // wait for all the future results from sort threads
+ LOG.info("Spilling to " + filename.toString());
+ for (int i = 0; i < partitions; ++i) {
+ TezRawKeyValueIterator kvIter = merger.filter(i);
+ //write merged output to disk
+ long segmentStart = out.getPos();
+ Writer writer =
+ new Writer(conf, out, keyClass, valClass, codec,
+ spilledRecordsCounter);
+ writer.setRLE(merger.needsRLE());
+ if (combiner == null) {
+ while(kvIter.next()) {
+ writer.append(kvIter.getKey(), kvIter.getValue());
+ }
+ } else {
+ runCombineProcessor(kvIter, writer);
+ }
+ //close
+ writer.close();
+
+ // record offsets
+ final TezIndexRecord rec =
+ new TezIndexRecord(
+ segmentStart,
+ writer.getRawLength(),
+ writer.getCompressedLength());
+ spillRec.putIndex(rec, i);
+ }
+
+ Path indexFilename =
+ mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+ * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ // TODO: cache
+ spillRec.writeToFile(indexFilename, conf);
+ ++numSpills;
+ } catch(InterruptedException ie) {
+ // TODO:the combiner has been interrupted
+ } finally {
+ out.close();
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ final String uniqueIdentifier = outputContext.getUniqueIdentifier();
+ Path finalOutputFile =
+ mapOutputFile.getOutputFileForWrite(0); //TODO
+ Path finalIndexFile =
+ mapOutputFile.getOutputIndexFileForWrite(0); //TODO
+
+ LOG.info("Starting flush of map output");
+ span.end();
+ merger.add(span.sort(sorter, comparator));
+ spill();
+ sortmaster.shutdown();
+
+ largeBuffer = null;
+
+ if(numSpills == 1) {
+ // someday be able to pass this directly to shuffle
+ // without writing to disk
+ final Path filename =
+ mapOutputFile.getSpillFile(0);
+ Path indexFilename =
+ mapOutputFile.getSpillIndexFile(0);
+ sameVolRename(filename, finalOutputFile);
+ sameVolRename(indexFilename, finalIndexFile);
+ return;
+ }
+
+ //The output stream for the final single output file
+ FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
+
+ TezMerger.considerFinalMergeForProgress();
+
+ final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+ final ArrayList<TezSpillRecord> indexCacheList = new ArrayList<TezSpillRecord>();
+
+ for(int i = 0; i < numSpills; i++) {
+ // TODO: build this cache before
+ Path indexFilename = mapOutputFile.getSpillIndexFile(i);
+ TezSpillRecord spillIndex = new TezSpillRecord(indexFilename, conf);
+ indexCacheList.add(spillIndex);
+ }
+
+ for (int parts = 0; parts < partitions; parts++) {
+ //create the segments to be merged
+ List<Segment> segmentList =
+ new ArrayList<Segment>(numSpills);
+ for(int i = 0; i < numSpills; i++) {
+ Path spillFilename = mapOutputFile.getSpillFile(i);
+ TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
+
+ Segment s =
+ new Segment(conf, rfs, spillFilename, indexRecord.getStartOffset(),
+ indexRecord.getPartLength(), codec, true);
+ segmentList.add(i, s);
+ }
+
+ int mergeFactor =
+ this.conf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_IO_SORT_FACTOR);
+ // sort the segments only if there are intermediate merges
+ boolean sortSegments = segmentList.size() > mergeFactor;
+ //merge
+ TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
+ keyClass, valClass, codec,
+ segmentList, mergeFactor,
+ new Path(uniqueIdentifier),
+ (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
+ nullProgressable, sortSegments,
+ null, spilledRecordsCounter,
+ null); // Not using any Progress in TezMerger. Should just work.
+
+ //write merged output to disk
+ long segmentStart = finalOut.getPos();
+ Writer writer =
+ new Writer(conf, finalOut, keyClass, valClass, codec,
+ spilledRecordsCounter);
+ writer.setRLE(merger.needsRLE());
+ if (combiner == null || numSpills < minSpillsForCombine) {
+ TezMerger.writeFile(kvIter, writer, nullProgressable, conf);
+ } else {
+ runCombineProcessor(kvIter, writer);
+ }
+
+ //close
+ writer.close();
+
+ // record offsets
+ final TezIndexRecord rec =
+ new TezIndexRecord(
+ segmentStart,
+ writer.getRawLength(),
+ writer.getCompressedLength());
+ spillRec.putIndex(rec, parts);
+ }
+
+ spillRec.writeToFile(finalIndexFile, conf);
+ finalOut.close();
+ for(int i = 0; i < numSpills; i++) {
+ Path indexFilename = mapOutputFile.getSpillIndexFile(i);
+ Path spillFilename = mapOutputFile.getSpillFile(i);
+ rfs.delete(indexFilename,true);
+ rfs.delete(spillFilename,true);
+ }
+ }
+
+ public void close() { }
+
+ private interface PartitionedRawKeyValueIterator extends TezRawKeyValueIterator {
+ int getPartition();
+ }
+
+ private class BufferStreamWrapper extends OutputStream
+ {
+ private final ByteBuffer out;
+ public BufferStreamWrapper(ByteBuffer out) {
+ this.out = out;
+ }
+
+ @Override
+ public void write(int b) throws IOException { out.put((byte)b); }
+ @Override
+ public void write(byte[] b) throws IOException { out.put(b); }
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException { out.put(b, off, len); }
+ }
+
+ protected class InputByteBuffer extends DataInputBuffer {
+ private byte[] buffer = new byte[256];
+ private ByteBuffer wrapped = ByteBuffer.wrap(buffer);
+ private void resize(int length) {
+ if(length > buffer.length) {
+ buffer = new byte[length];
+ wrapped = ByteBuffer.wrap(buffer);
+ }
+ wrapped.limit(length);
+ }
+ public void reset(ByteBuffer b, int start, int length) {
+ resize(length);
+ b.position(start);
+ b.get(buffer, 0, length);
+ super.reset(buffer, 0, length);
+ }
+ // clone-ish function
+ public void reset(DataInputBuffer clone) {
+ byte[] data = clone.getData();
+ int start = clone.getPosition();
+ int length = clone.getLength();
+ resize(length);
+ System.arraycopy(data, start, buffer, 0, length);
+ super.reset(buffer, 0, length);
+ }
+ }
+
+ private class SortSpan implements IndexedSortable {
+ final IntBuffer kvmeta;
+ final ByteBuffer kvbuffer;
+ final DataOutputStream out;
+ private RawComparator comparator;
+ final int imeta[] = new int[NMETA];
+ final int jmeta[] = new int[NMETA];
+ int keymax = 1;
+ int valmax = 1;
+ private int i,j;
+ private byte[] ki;
+ private byte[] kj;
+ private int index = 0;
+ private InputByteBuffer hay = new InputByteBuffer();
+ private long eq = 0;
+
+ public SortSpan(ByteBuffer source, int maxItems, int perItem) {
+ int capacity = source.remaining();
+ int metasize = METASIZE*maxItems;
+ int dataSize = maxItems * perItem;
+ if(capacity < (metasize+dataSize)) {
+ // try to allocate less meta space, because we have sample data
+ metasize = METASIZE*(capacity/(perItem+METASIZE));
+ }
+ ByteBuffer reserved = source.duplicate();
+ reserved.mark();
+ LOG.info("reserved.remaining() = "+reserved.remaining());
+ LOG.info("reserved.size = "+metasize);
+ reserved.position(metasize);
+ kvbuffer = reserved.slice();
+ reserved.flip();
+ reserved.limit(metasize);
+ kvmeta = reserved
+ .slice()
+ .order(ByteOrder.nativeOrder())
+ .asIntBuffer();
+ out = new DataOutputStream(
+ new BufferStreamWrapper(kvbuffer));
+ }
+
+ public SpanIterator sort(IndexedSorter sorter, RawComparator comparator) {
+ this.comparator = comparator;
+ ki = new byte[keymax];
+ kj = new byte[keymax];
+ LOG.info("begin sorting Span"+index + " ("+length()+")");
+ if(length() > 1) {
+ sorter.sort(this, 0, length(), nullProgressable);
+ }
+ LOG.info("done sorting Span"+index);
+ return new SpanIterator(this);
+ }
+
+ int offsetFor(int i) {
+ return (i * NMETA);
+ }
+
+ public void swap(final int mi, final int mj) {
+ final int kvi = offsetFor(mi);
+ final int kvj = offsetFor(mj);
+
+ kvmeta.position(kvi); kvmeta.get(imeta);
+ kvmeta.position(kvj); kvmeta.get(jmeta);
+ kvmeta.position(kvj); kvmeta.put(imeta);
+ kvmeta.position(kvi); kvmeta.put(jmeta);
+
+ if(i == mi || j == mj) i = -1;
+ if(i == mi || j == mj) j = -1;
+ }
+
+ public int compare(final int mi, final int mj) {
+ final int kvi = offsetFor(mi);
+ final int kvj = offsetFor(mj);
+ final int kvip = kvmeta.get(kvi + PARTITION);
+ final int kvjp = kvmeta.get(kvj + PARTITION);
+ // sort by partition
+ if (kvip != kvjp) {
+ return kvip - kvjp;
+ }
+
+ final int istart = kvmeta.get(kvi + KEYSTART);
+ final int jstart = kvmeta.get(kvj + KEYSTART);
+ final int ilen = kvmeta.get(kvi + VALSTART) - istart;
+ final int jlen = kvmeta.get(kvj + VALSTART) - jstart;
+
+ kvbuffer.position(istart);
+ kvbuffer.get(ki, 0, ilen);
+ kvbuffer.position(jstart);
+ kvbuffer.get(kj, 0, jlen);
+ // sort by key
+ final int cmp = comparator.compare(ki, 0, ilen, kj, 0, jlen);
+ if(cmp == 0) eq++;
+ return cmp;
+ }
+
+ public SortSpan next() {
+ ByteBuffer remaining = end();
+ if(remaining != null) {
+ int items = length();
+ int perItem = kvbuffer.position()/items;
+ SortSpan newSpan = new SortSpan(remaining, items, perItem);
+ newSpan.index = index+1;
+ return newSpan;
+ }
+ return null;
+ }
+
+ public int length() {
+ return kvmeta.limit()/NMETA;
+ }
+
+ public ByteBuffer end() {
+ ByteBuffer remaining = kvbuffer.duplicate();
+ remaining.position(kvbuffer.position());
+ remaining = remaining.slice();
+ kvbuffer.limit(kvbuffer.position());
+ kvmeta.limit(kvmeta.position());
+ int items = length();
+ if(items == 0) {
+ return null;
+ }
+ int perItem = kvbuffer.position()/items;
+ LOG.info(String.format("Span%d.length = %d, perItem = %d", index, length(), perItem));
+ if(remaining.remaining() < NMETA+perItem) {
+ return null;
+ }
+ return remaining;
+ }
+
+ private int compareInternal(DataInputBuffer needle, int needlePart, int index) {
+ int cmp = 0;
+ int keystart;
+ int valstart;
+ int partition;
+ partition = kvmeta.get(span.offsetFor(index) + PARTITION);
+ if(partition != needlePart) {
+ cmp = (partition-needlePart);
+ } else {
+ keystart = kvmeta.get(span.offsetFor(index) + KEYSTART);
+ valstart = kvmeta.get(span.offsetFor(index) + VALSTART);
+ // hay is allocated ahead of time
+ hay.reset(kvbuffer, keystart, valstart - keystart);
+ cmp = comparator.compare(hay.getData(),
+ hay.getPosition(), hay.getLength(),
+ needle.getData(),
+ needle.getPosition(), needle.getLength());
+ }
+ return cmp;
+ }
+
+ public long getEq() {
+ return eq;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Span[%d,%d]", NMETA*kvmeta.capacity(), kvbuffer.limit());
+ }
+ }
+
+ private class SpanIterator implements PartitionedRawKeyValueIterator, Comparable<SpanIterator> {
+ private int kvindex = -1;
+ private int maxindex;
+ private IntBuffer kvmeta;
+ private ByteBuffer kvbuffer;
+ private SortSpan span;
+ private InputByteBuffer key = new InputByteBuffer();
+ private InputByteBuffer value = new InputByteBuffer();
+ private Progress progress = new Progress();
+
+ private final int minrun = (1 << 4);
+
+ public SpanIterator(SortSpan span) {
+ this.kvmeta = span.kvmeta;
+ this.kvbuffer = span.kvbuffer;
+ this.span = span;
+ this.maxindex = (kvmeta.limit()/NMETA) - 1;
+ }
+
+ public DataInputBuffer getKey() throws IOException {
+ final int keystart = kvmeta.get(span.offsetFor(kvindex) + KEYSTART);
+ final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART);
+ key.reset(kvbuffer, keystart, valstart - keystart);
+ return key;
+ }
+
+ public DataInputBuffer getValue() throws IOException {
+ final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART);
+ final int vallen = kvmeta.get(span.offsetFor(kvindex) + VALLEN);
+ value.reset(kvbuffer, valstart, vallen);
+ return value;
+ }
+
+ public boolean next() throws IOException {
+ // caveat: since we use this as a comparable in the merger
+ if(kvindex == maxindex) return false;
+ if(kvindex % 100 == 0) {
+ progress.set((kvindex-maxindex) / maxindex);
+ }
+ kvindex += 1;
+ return true;
+ }
+
+ public void close() throws IOException {
+ }
+
+ public Progress getProgress() {
+ return progress;
+ }
+
+ public int getPartition() {
+ final int partition = kvmeta.get(span.offsetFor(kvindex) + PARTITION);
+ return partition;
+ }
+
+ public int size() {
+ return (maxindex - kvindex);
+ }
+
+ public int compareTo(SpanIterator other) {
+ try {
+ return span.compareInternal(other.getKey(), other.getPartition(), kvindex);
+ } catch(IOException ie) {
+ // since we're not reading off disk, how could getKey() throw exceptions?
+ }
+ return -1;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("SpanIterator<%d:%d> (span=%s)", kvindex, maxindex, span.toString());
+ }
+
+ /**
+ * bisect returns the next insertion point for a given raw key, skipping keys
+ * which are <= needle using a binary search instead of a linear comparison.
+ * This is massively efficient when long strings of identical keys occur.
+ * @param needle
+ * @param needlePart
+ * @return
+ */
+ int bisect(DataInputBuffer needle, int needlePart) {
+ int start = kvindex;
+ int end = maxindex-1;
+ int mid = start;
+ int cmp = 0;
+
+ if(end - start < minrun) {
+ return 0;
+ }
+
+ if(span.compareInternal(needle, needlePart, start) > 0) {
+ return kvindex;
+ }
+
+ // bail out early if we haven't got a min run
+ if(span.compareInternal(needle, needlePart, start+minrun) > 0) {
+ return 0;
+ }
+
+ if(span.compareInternal(needle, needlePart, end) < 0) {
+ return end - kvindex;
+ }
+
+ boolean found = false;
+
+ // we sort 100k items, the max it can do is 20 loops, but break early
+ for(int i = 0; start < end && i < 16; i++) {
+ mid = start + (end - start)/2;
+ cmp = span.compareInternal(needle, needlePart, mid);
+ if(cmp == 0) {
+ start = mid;
+ found = true;
+ } else if(cmp < 0) {
+ start = mid;
+ found = true;
+ }
+ if(cmp > 0) {
+ end = mid;
+ }
+ }
+
+ if(found) {
+ return start - kvindex;
+ }
+ return 0;
+ }
+ }
+
+ private class SortTask implements Callable<SpanIterator> {
+ private final SortSpan sortable;
+ private final IndexedSorter sorter;
+ private final RawComparator comparator;
+
+ public SortTask(SortSpan sortable,
+ IndexedSorter sorter, RawComparator comparator) {
+ this.sortable = sortable;
+ this.sorter = sorter;
+ this.comparator = comparator;
+ }
+
+ public SpanIterator call() {
+ return sortable.sort(sorter, comparator);
+ }
+ }
+
+ private class PartitionFilter implements TezRawKeyValueIterator {
+ private final PartitionedRawKeyValueIterator iter;
+ private int partition;
+ private boolean dirty = false;
+ public PartitionFilter(PartitionedRawKeyValueIterator iter) {
+ this.iter = iter;
+ }
+ public DataInputBuffer getKey() throws IOException { return iter.getKey(); }
+ public DataInputBuffer getValue() throws IOException { return iter.getValue(); }
+ public void close() throws IOException { }
+ public Progress getProgress() {
+ return new Progress();
+ }
+ public boolean next() throws IOException {
+ if(dirty || iter.next()) {
+ int prefix = iter.getPartition();
+
+ if((prefix >>> (32 - partitionBits)) == partition) {
+ dirty = false; // we found what we were looking for, good
+ return true;
+ } else if(!dirty) {
+ dirty = true; // we did a lookahead and failed to find partition
+ }
+ }
+ return false;
+ }
+
+ public void reset(int partition) {
+ this.partition = partition;
+ }
+
+ public int getPartition() {
+ return this.partition;
+ }
+ }
+
+ private class SpanHeap extends java.util.PriorityQueue<SpanIterator> {
+ public SpanHeap() {
+ super(256);
+ }
+ /**
+ * {@link PriorityQueue}.poll() by a different name
+ * @return
+ */
+ public SpanIterator pop() {
+ return this.poll();
+ }
+ }
+
+ private class SpanMerger implements PartitionedRawKeyValueIterator {
+ private final RawComparator comparator;
+ InputByteBuffer key = new InputByteBuffer();
+ InputByteBuffer value = new InputByteBuffer();
+ int partition;
+
+ private ArrayList< Future<SpanIterator>> futures = new ArrayList< Future<SpanIterator>>();
+
+ private SpanHeap heap = new SpanHeap();
+ private PartitionFilter partIter;
+
+ private int gallop = 0;
+ private SpanIterator horse;
+ private long total = 0;
+ private long count = 0;
+ private long eq = 0;
+
+ public SpanMerger(RawComparator comparator) {
+ this.comparator = comparator;
+ partIter = new PartitionFilter(this);
+ }
+
+ public void add(SpanIterator iter) throws IOException{
+ if(iter.next()) {
+ heap.add(iter);
+ }
+ }
+
+ public void add(Future<SpanIterator> iter) throws IOException{
+ this.futures.add(iter);
+ }
+
+ public boolean ready() throws IOException, InterruptedException {
+ try {
+ SpanIterator iter = null;
+ while(this.futures.size() > 0) {
+ Future<SpanIterator> futureIter = this.futures.remove(0);
+ iter = futureIter.get();
+ this.add(iter);
+ }
+
+ StringBuilder sb = new StringBuilder();
+ for(SpanIterator sp: heap) {
+ sb.append(sp.toString());
+ sb.append(",");
+ total += sp.span.length();
+ eq += sp.span.getEq();
+ }
+ LOG.info("Heap = " + sb.toString());
+ return true;
+ } catch(Exception e) {
+ LOG.info(e.toString());
+ return false;
+ }
+ }
+
+ private SpanIterator pop() throws IOException {
+ if(gallop > 0) {
+ gallop--;
+ return horse;
+ }
+ SpanIterator current = heap.pop();
+ SpanIterator next = heap.peek();
+ if(next != null && current != null &&
+ ((Object)horse) == ((Object)current)) {
+ // TODO: a better threshold check
+ gallop = current.bisect(next.getKey(), next.getPartition())-1;
+ }
+ horse = current;
+ return current;
+ }
+
+ public boolean needsRLE() {
+ return (eq > 0.1 * total);
+ }
+
+ private SpanIterator peek() throws IOException {
+ if(gallop > 0) {
+ return horse;
+ }
+ return heap.peek();
+ }
+
+ public boolean next() throws IOException {
+ SpanIterator current = pop();
+
+ if(current != null) {
+ // keep local copies, since add() will move it all out
+ key.reset(current.getKey());
+ value.reset(current.getValue());
+ partition = current.getPartition();
+ if(gallop <= 0) {
+ this.add(current);
+ } else {
+ // galloping
+ current.next();
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public DataInputBuffer getKey() throws IOException { return key; }
+ public DataInputBuffer getValue() throws IOException { return value; }
+ public int getPartition() { return partition; }
+
+ public void close() throws IOException {
+ }
+
+ public Progress getProgress() {
+ // TODO
+ return new Progress();
+ }
+
+ public TezRawKeyValueIterator filter(int partition) {
+ partIter.reset(partition);
+ return partIter;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
new file mode 100644
index 0000000..95ae8eb
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
@@ -0,0 +1,45 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common.sort.impl;
+
+public class TezIndexRecord {
+ private long startOffset;
+ private long rawLength;
+ private long partLength;
+
+ public TezIndexRecord() { }
+
+ public TezIndexRecord(long startOffset, long rawLength, long partLength) {
+ this.startOffset = startOffset;
+ this.rawLength = rawLength;
+ this.partLength = partLength;
+ }
+
+ public long getStartOffset() {
+ return startOffset;
+ }
+
+ public long getRawLength() {
+ return rawLength;
+ }
+
+ public long getPartLength() {
+ return partLength;
+ }
+}