You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:48 UTC
[41/50] [abbrv] Rename tez-engine-api to tez-runtime-api and
tez-engine is split into 2: - tez-engine-library for user-visible
Input/Output/Processor implementations - tez-engine-internals for framework
internals
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
deleted file mode 100644
index a6d1c5b..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
+++ /dev/null
@@ -1,572 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common.shuffle.server;
-
-import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import javax.crypto.SecretKey;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MutableCounterInt;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
-import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
-import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
-import org.apache.hadoop.yarn.server.api.AuxiliaryService;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
-import org.apache.tez.engine.common.security.JobTokenSecretManager;
-import org.apache.tez.engine.common.security.SecureShuffleUtils;
-import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
-import org.apache.tez.engine.common.sort.impl.ExternalSorter;
-import org.apache.tez.engine.shuffle.common.ShuffleUtils;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedStream;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import org.jboss.netty.util.CharsetUtil;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class ShuffleHandler extends AuxiliaryService {
-
- private static final Log LOG = LogFactory.getLog(ShuffleHandler.class);
-
- public static final String SHUFFLE_MANAGE_OS_CACHE = "mapreduce.shuffle.manage.os.cache";
- public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
-
- public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes";
- public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
-
- private int port;
- private ChannelFactory selector;
- private final ChannelGroup accepted = new DefaultChannelGroup();
- private HttpPipelineFactory pipelineFact;
- private int sslFileBufferSize;
-
- public static final String MAPREDUCE_SHUFFLE_SERVICEID =
- "mapreduce.shuffle";
-
- private static final Map<String,String> userRsrc =
- new ConcurrentHashMap<String,String>();
- private static final JobTokenSecretManager secretManager =
- new JobTokenSecretManager();
- private SecretKey tokenSecret;
-
- public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
- public static final int DEFAULT_SHUFFLE_PORT = 8080;
-
- public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
- "mapreduce.shuffle.ssl.file.buffer.size";
-
- public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
-
- private ExternalSorter sorter;
-
- @Metrics(about="Shuffle output metrics", context="mapred")
- static class ShuffleMetrics implements ChannelFutureListener {
- @Metric("Shuffle output in bytes")
- MutableCounterLong shuffleOutputBytes;
- @Metric("# of failed shuffle outputs")
- MutableCounterInt shuffleOutputsFailed;
- @Metric("# of succeeeded shuffle outputs")
- MutableCounterInt shuffleOutputsOK;
- @Metric("# of current shuffle connections")
- MutableGaugeInt shuffleConnections;
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- shuffleOutputsOK.incr();
- } else {
- shuffleOutputsFailed.incr();
- }
- shuffleConnections.decr();
- }
- }
-
- final ShuffleMetrics metrics;
-
- ShuffleHandler(MetricsSystem ms) {
- super("httpshuffle");
- metrics = ms.register(new ShuffleMetrics());
- }
-
- public ShuffleHandler(ExternalSorter sorter) {
- this(DefaultMetricsSystem.instance());
- this.sorter = sorter;
- }
-
- /**
- * Serialize the shuffle port into a ByteBuffer for use later on.
- * @param port the port to be sent to the ApplciationMaster
- * @return the serialized form of the port.
- */
- public static ByteBuffer serializeMetaData(int port) throws IOException {
- //TODO these bytes should be versioned
- DataOutputBuffer port_dob = new DataOutputBuffer();
- port_dob.writeInt(port);
- return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
- }
-
- /**
- * A helper function to deserialize the metadata returned by ShuffleHandler.
- * @param meta the metadata returned by the ShuffleHandler
- * @return the port the Shuffle Handler is listening on to serve shuffle data.
- */
- public static int deserializeMetaData(ByteBuffer meta) throws IOException {
- //TODO this should be returning a class not just an int
- DataInputByteBuffer in = new DataInputByteBuffer();
- in.reset(meta);
- int port = in.readInt();
- return port;
- }
-
- /**
- * A helper function to serialize the JobTokenIdentifier to be sent to the
- * ShuffleHandler as ServiceData.
- * @param jobToken the job token to be used for authentication of
- * shuffle data requests.
- * @return the serialized version of the jobToken.
- */
- public static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> jobToken) throws IOException {
- //TODO these bytes should be versioned
- DataOutputBuffer jobToken_dob = new DataOutputBuffer();
- jobToken.write(jobToken_dob);
- return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength());
- }
-
- static Token<JobTokenIdentifier> deserializeServiceData(ByteBuffer secret) throws IOException {
- DataInputByteBuffer in = new DataInputByteBuffer();
- in.reset(secret);
- Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
- jt.readFields(in);
- return jt;
- }
-
-
- @Override
- public void initializeApplication(
- ApplicationInitializationContext initAppContext) {
- // TODO these bytes should be versioned
- try {
- String user = initAppContext.getUser();
- ApplicationId appId = initAppContext.getApplicationId();
- ByteBuffer secret = initAppContext.getApplicationDataForService();
- Token<JobTokenIdentifier> jt = deserializeServiceData(secret);
- // TODO: Once SHuffle is out of NM, this can use MR APIs
- userRsrc.put(appId.toString(), user);
- LOG.info("Added token for " + appId.toString());
- secretManager.addTokenForJob(appId.toString(), jt);
- } catch (IOException e) {
- LOG.error("Error during initApp", e);
- // TODO add API to AuxiliaryServices to report failures
- }
- }
-
- @Override
- public void stopApplication(ApplicationTerminationContext context) {
- ApplicationId appId = context.getApplicationId();
- secretManager.removeTokenForJob(appId.toString());
- userRsrc.remove(appId.toString());
- }
-
- public void initialize(TezOutputContext outputContext, Configuration conf) throws IOException {
- this.init(new Configuration(conf));
- tokenSecret = ShuffleUtils.getJobTokenSecretFromTokenBytes(outputContext.getServiceConsumerMetaData(MAPREDUCE_SHUFFLE_SERVICEID));
- }
-
- @Override
- public synchronized void serviceInit(Configuration conf) {
- ThreadFactory bossFactory = new ThreadFactoryBuilder()
- .setNameFormat("ShuffleHandler Netty Boss #%d")
- .build();
- ThreadFactory workerFactory = new ThreadFactoryBuilder()
- .setNameFormat("ShuffleHandler Netty Worker #%d")
- .build();
-
- selector = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(bossFactory),
- Executors.newCachedThreadPool(workerFactory));
- }
-
- // TODO change AbstractService to throw InterruptedException
- @Override
- public synchronized void serviceStart() {
- Configuration conf = getConfig();
- ServerBootstrap bootstrap = new ServerBootstrap(selector);
- try {
- pipelineFact = new HttpPipelineFactory(conf);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- bootstrap.setPipelineFactory(pipelineFact);
- // Let OS pick the port
- Channel ch = bootstrap.bind(new InetSocketAddress(0));
- accepted.add(ch);
- port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
- conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
- pipelineFact.SHUFFLE.setPort(port);
- LOG.info(getName() + " listening on port " + port);
-
- sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
- DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
- }
-
- @Override
- public synchronized void serviceStop() {
- accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
- ServerBootstrap bootstrap = new ServerBootstrap(selector);
- bootstrap.releaseExternalResources();
- pipelineFact.destroy();
- }
-
- @Override
- public synchronized ByteBuffer getMetaData() {
- try {
- return serializeMetaData(port);
- } catch (IOException e) {
- LOG.error("Error during getMeta", e);
- // TODO add API to AuxiliaryServices to report failures
- return null;
- }
- }
-
- class HttpPipelineFactory implements ChannelPipelineFactory {
-
- final Shuffle SHUFFLE;
- private SSLFactory sslFactory;
-
- public HttpPipelineFactory(Configuration conf) throws Exception {
- SHUFFLE = new Shuffle(conf);
- if (conf.getBoolean(TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_SSL,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_ENABLE_SSL)) {
- sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
- sslFactory.init();
- }
- }
-
- public void destroy() {
- if (sslFactory != null) {
- sslFactory.destroy();
- }
- }
-
- @Override
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = Channels.pipeline();
- if (sslFactory != null) {
- pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
- }
- pipeline.addLast("decoder", new HttpRequestDecoder());
- pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
- pipeline.addLast("encoder", new HttpResponseEncoder());
- pipeline.addLast("chunking", new ChunkedWriteHandler());
- pipeline.addLast("shuffle", SHUFFLE);
- return pipeline;
- // TODO factor security manager into pipeline
- // TODO factor out encode/decode to permit binary shuffle
- // TODO factor out decode of index to permit alt. models
- }
-
- }
-
- class Shuffle extends SimpleChannelUpstreamHandler {
-
- private final Configuration conf;
- private int port;
-
- public Shuffle(Configuration conf) {
- this.conf = conf;
- this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
- }
-
- public void setPort(int port) {
- this.port = port;
- }
-
- private List<String> splitMaps(List<String> mapq) {
- if (null == mapq) {
- return null;
- }
- final List<String> ret = new ArrayList<String>();
- for (String s : mapq) {
- Collections.addAll(ret, s.split(","));
- }
- return ret;
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
- throws Exception {
- HttpRequest request = (HttpRequest) evt.getMessage();
- if (request.getMethod() != GET) {
- sendError(ctx, METHOD_NOT_ALLOWED);
- return;
- }
- // Check whether the shuffle version is compatible
- if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
- request.getHeader(ShuffleHeader.HTTP_HEADER_NAME))
- || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
- request.getHeader(ShuffleHeader.HTTP_HEADER_VERSION))) {
- sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
- }
- final Map<String,List<String>> q =
- new QueryStringDecoder(request.getUri()).getParameters();
- final List<String> mapIds = splitMaps(q.get("map"));
- final List<String> reduceQ = q.get("reduce");
- final List<String> jobQ = q.get("job");
- if (LOG.isDebugEnabled()) {
- LOG.debug("RECV: " + request.getUri() +
- "\n mapId: " + mapIds +
- "\n reduceId: " + reduceQ +
- "\n jobId: " + jobQ);
- }
-
- if (mapIds == null || reduceQ == null || jobQ == null) {
- sendError(ctx, "Required param job, map and reduce", BAD_REQUEST);
- return;
- }
- if (reduceQ.size() != 1 || jobQ.size() != 1) {
- sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST);
- return;
- }
- int reduceId;
- String jobId;
- try {
- reduceId = Integer.parseInt(reduceQ.get(0));
- jobId = jobQ.get(0);
- } catch (NumberFormatException e) {
- sendError(ctx, "Bad reduce parameter", BAD_REQUEST);
- return;
- } catch (IllegalArgumentException e) {
- sendError(ctx, "Bad job parameter", BAD_REQUEST);
- return;
- }
-
- final String reqUri = request.getUri();
- if (null == reqUri) {
- // TODO? add upstream?
- sendError(ctx, FORBIDDEN);
- return;
- }
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
- try {
- verifyRequest(jobId, ctx, request, response,
- new URL("http", "", this.port, reqUri));
- } catch (IOException e) {
- LOG.warn("Shuffle failure ", e);
- sendError(ctx, e.getMessage(), UNAUTHORIZED);
- return;
- }
-
- Channel ch = evt.getChannel();
- ch.write(response);
- // TODO refactor the following into the pipeline
- ChannelFuture lastMap = null;
- for (String mapId : mapIds) {
- try {
- // TODO: Error handling - validate mapId via TezTaskAttemptId.forName
-
- // TODO NEWTEZ Fix this. TaskAttemptId is no longer valid. mapId validation will not work anymore.
-// if (!mapId.equals(sorter.getTaskAttemptId().toString())) {
-// String errorMessage =
-// "Illegal shuffle request mapId: " + mapId
-// + " while actual mapId is " + sorter.getTaskAttemptId();
-// LOG.warn(errorMessage);
-// sendError(ctx, errorMessage, BAD_REQUEST);
-// return;
-// }
-
- lastMap =
- sendMapOutput(ctx, ch, userRsrc.get(jobId), jobId, mapId, reduceId);
- if (null == lastMap) {
- sendError(ctx, NOT_FOUND);
- return;
- }
- } catch (IOException e) {
- LOG.error("Shuffle error ", e);
- sendError(ctx, e.getMessage(), INTERNAL_SERVER_ERROR);
- return;
- }
- }
- lastMap.addListener(metrics);
- lastMap.addListener(ChannelFutureListener.CLOSE);
- }
-
- private void verifyRequest(String appid, ChannelHandlerContext ctx,
- HttpRequest request, HttpResponse response, URL requestUri)
- throws IOException {
- if (null == tokenSecret) {
- LOG.info("Request for unknown token " + appid);
- throw new IOException("could not find jobid");
- }
- // string to encrypt
- String enc_str = SecureShuffleUtils.buildMsgFrom(requestUri);
- // hash from the fetcher
- String urlHashStr =
- request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
- if (urlHashStr == null) {
- LOG.info("Missing header hash for " + appid);
- throw new IOException("fetcher cannot be authenticated");
- }
- if (LOG.isDebugEnabled()) {
- int len = urlHashStr.length();
- LOG.debug("verifying request. enc_str=" + enc_str + "; hash=..." +
- urlHashStr.substring(len-len/2, len-1));
- }
- // verify - throws exception
- SecureShuffleUtils.verifyReply(urlHashStr, enc_str, tokenSecret);
- // verification passed - encode the reply
- String reply =
- SecureShuffleUtils.generateHash(urlHashStr.getBytes(), tokenSecret);
- response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
- addVersionToHeader(response);
- if (LOG.isDebugEnabled()) {
- int len = reply.length();
- LOG.debug("Fetcher request verfied. enc_str=" + enc_str + ";reply=" +
- reply.substring(len-len/2, len-1));
- }
- }
-
- protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch,
- String user, String jobId, String mapId, int reduce)
- throws IOException {
- final ShuffleHeader header = sorter.getShuffleHeader(reduce);
- final DataOutputBuffer dob = new DataOutputBuffer();
- header.write(dob);
- ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
-
- ChannelFuture writeFuture =
- ch.write(
- new ChunkedStream(
- sorter.getSortedStream(reduce), sslFileBufferSize
- )
- );
- metrics.shuffleConnections.incr();
- metrics.shuffleOutputBytes.incr(header.getCompressedLength()); // optimistic
- return writeFuture;
- }
-
- private void sendError(ChannelHandlerContext ctx,
- HttpResponseStatus status) {
- sendError(ctx, "", status);
- }
-
- private void sendError(ChannelHandlerContext ctx, String message,
- HttpResponseStatus status) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
- response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
- addVersionToHeader(response);
- response.setContent(
- ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
- // Close the connection as soon as the error message is sent.
- ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
- }
-
- private void addVersionToHeader(HttpResponse response) {
- // Put shuffle version into http header
- response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
- ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
- response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
- ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
- throws Exception {
- Channel ch = e.getChannel();
- Throwable cause = e.getCause();
- if (cause instanceof TooLongFrameException) {
- sendError(ctx, BAD_REQUEST);
- return;
- }
-
- LOG.error("Shuffle error: ", cause);
- if (ch.isConnected()) {
- LOG.error("Shuffle error " + e);
- sendError(ctx, INTERNAL_SERVER_ERROR);
- }
- }
-
- }
-
- public int getPort() {
- return port;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
deleted file mode 100644
index 5aa0ddf..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common.sort.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.util.IndexedSorter;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.QuickSort;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.Partitioner;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.TezEngineUtils;
-import org.apache.tez.engine.common.combine.Combiner;
-import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
-import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.hadoop.compat.NullProgressable;
-
-@SuppressWarnings({"unchecked", "rawtypes"})
-public abstract class ExternalSorter {
-
- private static final Log LOG = LogFactory.getLog(ExternalSorter.class);
-
- public abstract void close() throws IOException;
-
- public abstract void flush() throws IOException;
-
- public abstract void write(Object key, Object value) throws IOException;
-
- protected Progressable nullProgressable = new NullProgressable();
- protected TezOutputContext outputContext;
- protected Combiner combiner;
- protected Partitioner partitioner;
- protected Configuration conf;
- protected FileSystem rfs;
- protected TezTaskOutput mapOutputFile;
- protected int partitions;
- protected Class keyClass;
- protected Class valClass;
- protected RawComparator comparator;
- protected SerializationFactory serializationFactory;
- protected Serializer keySerializer;
- protected Serializer valSerializer;
-
- protected IndexedSorter sorter;
-
- // Compression for map-outputs
- protected CompressionCodec codec;
-
- // Counters
- // TODO TEZ Rename all counter variables [Mapping of counter to MR for compatibility in the MR layer]
- protected TezCounter mapOutputByteCounter;
- protected TezCounter mapOutputRecordCounter;
- protected TezCounter fileOutputByteCounter;
- protected TezCounter spilledRecordsCounter;
-
- public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
- this.outputContext = outputContext;
- this.conf = conf;
- this.partitions = numOutputs;
-
- rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw();
-
- // sorter
- sorter = ReflectionUtils.newInstance(this.conf.getClass(
- TezJobConfig.TEZ_ENGINE_INTERNAL_SORTER_CLASS, QuickSort.class,
- IndexedSorter.class), this.conf);
-
- comparator = ConfigUtils.getIntermediateOutputKeyComparator(this.conf);
-
- // k/v serialization
- keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
- valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
- serializationFactory = new SerializationFactory(this.conf);
- keySerializer = serializationFactory.getSerializer(keyClass);
- valSerializer = serializationFactory.getSerializer(valClass);
-
- // counters
- mapOutputByteCounter =
- outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_BYTES);
- mapOutputRecordCounter =
- outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS);
- fileOutputByteCounter =
- outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
- spilledRecordsCounter =
- outputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
- // compression
- if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
- Class<? extends CompressionCodec> codecClass =
- ConfigUtils.getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
- codec = ReflectionUtils.newInstance(codecClass, this.conf);
- } else {
- codec = null;
- }
-
- // Task outputs
- mapOutputFile = TezEngineUtils.instantiateTaskOutputManager(conf, outputContext);
-
- LOG.info("Instantiating Partitioner: [" + conf.get(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS) + "]");
- this.conf.setInt(TezJobConfig.TEZ_ENGINE_NUM_EXPECTED_PARTITIONS, this.partitions);
- this.partitioner = TezEngineUtils.instantiatePartitioner(this.conf);
- this.combiner = TezEngineUtils.instantiateCombiner(this.conf, outputContext);
- }
-
- /**
- * Exception indicating that the allocated sort buffer is insufficient to hold
- * the current record.
- */
- @SuppressWarnings("serial")
- public static class MapBufferTooSmallException extends IOException {
- public MapBufferTooSmallException(String s) {
- super(s);
- }
- }
-
- @Private
- public TezTaskOutput getMapOutput() {
- return mapOutputFile;
- }
-
- protected void runCombineProcessor(TezRawKeyValueIterator kvIter,
- Writer writer) throws IOException {
- try {
- combiner.combine(kvIter, writer);
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- /**
- * Rename srcPath to dstPath on the same volume. This is the same as
- * RawLocalFileSystem's rename method, except that it will not fall back to a
- * copy, and it will create the target directory if it doesn't exist.
- */
- protected void sameVolRename(Path srcPath, Path dstPath) throws IOException {
- RawLocalFileSystem rfs = (RawLocalFileSystem) this.rfs;
- File src = rfs.pathToFile(srcPath);
- File dst = rfs.pathToFile(dstPath);
- if (!dst.getParentFile().exists()) {
- if (!dst.getParentFile().mkdirs()) {
- throw new IOException("Unable to rename " + src + " to " + dst
- + ": couldn't create parent directory");
- }
- }
-
- if (!src.renameTo(dst)) {
- throw new IOException("Unable to rename " + src + " to " + dst);
- }
- }
-
- public InputStream getSortedStream(int partition) {
- throw new UnsupportedOperationException("getSortedStream isn't supported!");
- }
-
- public ShuffleHeader getShuffleHeader(int reduce) {
- throw new UnsupportedOperationException("getShuffleHeader isn't supported!");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java
deleted file mode 100644
index 7d10606..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java
+++ /dev/null
@@ -1,559 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.sort.impl;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.BufferUtils;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.tez.common.counters.TezCounter;
-
-/**
- * <code>IFile</code> is the simple <key-len, value-len, key, value> format
- * for the intermediate map-outputs in Map-Reduce.
- *
- * There is a <code>Writer</code> to write out map-outputs in this format and
- * a <code>Reader</code> to read files of this format.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class IFile {
- private static final Log LOG = LogFactory.getLog(IFile.class);
- public static final int EOF_MARKER = -1; // End of File Marker
- public static final int RLE_MARKER = -2; // Repeat same key marker
- public static final DataInputBuffer REPEAT_KEY = new DataInputBuffer();
-
- /**
- * <code>IFile.Writer</code> to write out intermediate map-outputs.
- */
- @InterfaceAudience.Private
- @InterfaceStability.Unstable
- @SuppressWarnings({"unchecked", "rawtypes"})
- public static class Writer {
- FSDataOutputStream out;
- boolean ownOutputStream = false;
- long start = 0;
- FSDataOutputStream rawOut;
- AtomicBoolean closed = new AtomicBoolean(false);
-
- CompressionOutputStream compressedOut;
- Compressor compressor;
- boolean compressOutput = false;
-
- long decompressedBytesWritten = 0;
- long compressedBytesWritten = 0;
-
- // Count records written to disk
- private long numRecordsWritten = 0;
- private final TezCounter writtenRecordsCounter;
-
- IFileOutputStream checksumOut;
-
- Class keyClass;
- Class valueClass;
- Serializer keySerializer;
- Serializer valueSerializer;
-
- DataOutputBuffer buffer = new DataOutputBuffer();
- DataOutputBuffer previous = new DataOutputBuffer();
-
- // de-dup keys or not
- private boolean rle = false;
-
- public Writer(Configuration conf, FileSystem fs, Path file,
- Class keyClass, Class valueClass,
- CompressionCodec codec,
- TezCounter writesCounter) throws IOException {
- this(conf, fs.create(file), keyClass, valueClass, codec,
- writesCounter);
- ownOutputStream = true;
- }
-
- protected Writer(TezCounter writesCounter) {
- writtenRecordsCounter = writesCounter;
- }
-
- public Writer(Configuration conf, FSDataOutputStream out,
- Class keyClass, Class valueClass,
- CompressionCodec codec, TezCounter writesCounter)
- throws IOException {
- this.writtenRecordsCounter = writesCounter;
- this.checksumOut = new IFileOutputStream(out);
- this.rawOut = out;
- this.start = this.rawOut.getPos();
- if (codec != null) {
- this.compressor = CodecPool.getCompressor(codec);
- if (this.compressor != null) {
- this.compressor.reset();
- this.compressedOut = codec.createOutputStream(checksumOut, compressor);
- this.out = new FSDataOutputStream(this.compressedOut, null);
- this.compressOutput = true;
- } else {
- LOG.warn("Could not obtain compressor from CodecPool");
- this.out = new FSDataOutputStream(checksumOut,null);
- }
- } else {
- this.out = new FSDataOutputStream(checksumOut,null);
- }
-
- this.keyClass = keyClass;
- this.valueClass = valueClass;
-
- if (keyClass != null) {
- SerializationFactory serializationFactory =
- new SerializationFactory(conf);
- this.keySerializer = serializationFactory.getSerializer(keyClass);
- this.keySerializer.open(buffer);
- this.valueSerializer = serializationFactory.getSerializer(valueClass);
- this.valueSerializer.open(buffer);
- }
- }
-
- public Writer(Configuration conf, FileSystem fs, Path file)
- throws IOException {
- this(conf, fs, file, null, null, null, null);
- }
-
- public void close() throws IOException {
- if (closed.getAndSet(true)) {
- throw new IOException("Writer was already closed earlier");
- }
-
- // When IFile writer is created by BackupStore, we do not have
- // Key and Value classes set. So, check before closing the
- // serializers
- if (keyClass != null) {
- keySerializer.close();
- valueSerializer.close();
- }
-
- // Write EOF_MARKER for key/value length
- WritableUtils.writeVInt(out, EOF_MARKER);
- WritableUtils.writeVInt(out, EOF_MARKER);
- decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
-
- //Flush the stream
- out.flush();
-
- if (compressOutput) {
- // Flush
- compressedOut.finish();
- compressedOut.resetState();
- }
-
- // Close the underlying stream iff we own it...
- if (ownOutputStream) {
- out.close();
- }
- else {
- // Write the checksum
- checksumOut.finish();
- }
-
- compressedBytesWritten = rawOut.getPos() - start;
-
- if (compressOutput) {
- // Return back the compressor
- CodecPool.returnCompressor(compressor);
- compressor = null;
- }
-
- out = null;
- if(writtenRecordsCounter != null) {
- writtenRecordsCounter.increment(numRecordsWritten);
- }
- }
-
- public void append(Object key, Object value) throws IOException {
- if (key.getClass() != keyClass)
- throw new IOException("wrong key class: "+ key.getClass()
- +" is not "+ keyClass);
- if (value.getClass() != valueClass)
- throw new IOException("wrong value class: "+ value.getClass()
- +" is not "+ valueClass);
-
- boolean sameKey = false;
-
- // Append the 'key'
- keySerializer.serialize(key);
- int keyLength = buffer.getLength();
- if (keyLength < 0) {
- throw new IOException("Negative key-length not allowed: " + keyLength +
- " for " + key);
- }
-
- if(keyLength == previous.getLength()) {
- sameKey = (BufferUtils.compare(previous, buffer) == 0);
- }
-
- if(!sameKey) {
- BufferUtils.copy(buffer, previous);
- }
-
- // Append the 'value'
- valueSerializer.serialize(value);
- int valueLength = buffer.getLength() - keyLength;
- if (valueLength < 0) {
- throw new IOException("Negative value-length not allowed: " +
- valueLength + " for " + value);
- }
-
- if(sameKey) {
- WritableUtils.writeVInt(out, RLE_MARKER); // Same key as previous
- WritableUtils.writeVInt(out, valueLength); // value length
- out.write(buffer.getData(), keyLength, buffer.getLength()); // only the value
- // Update bytes written
- decompressedBytesWritten += 0 + valueLength +
- WritableUtils.getVIntSize(RLE_MARKER) +
- WritableUtils.getVIntSize(valueLength);
- } else {
- // Write the record out
- WritableUtils.writeVInt(out, keyLength); // key length
- WritableUtils.writeVInt(out, valueLength); // value length
- out.write(buffer.getData(), 0, buffer.getLength()); // data
- // Update bytes written
- decompressedBytesWritten += keyLength + valueLength +
- WritableUtils.getVIntSize(keyLength) +
- WritableUtils.getVIntSize(valueLength);
- }
-
- // Reset
- buffer.reset();
-
-
- ++numRecordsWritten;
- }
-
- public void append(DataInputBuffer key, DataInputBuffer value)
- throws IOException {
- int keyLength = key.getLength() - key.getPosition();
- if (keyLength < 0) {
- throw new IOException("Negative key-length not allowed: " + keyLength +
- " for " + key);
- }
-
- int valueLength = value.getLength() - value.getPosition();
- if (valueLength < 0) {
- throw new IOException("Negative value-length not allowed: " +
- valueLength + " for " + value);
- }
-
- boolean sameKey = false;
-
- if(rle && keyLength == previous.getLength()) {
- sameKey = (keyLength != 0) && (BufferUtils.compare(previous, key) == 0);
- }
-
- if(rle && sameKey) {
- WritableUtils.writeVInt(out, RLE_MARKER);
- WritableUtils.writeVInt(out, valueLength);
- out.write(value.getData(), value.getPosition(), valueLength);
-
- // Update bytes written
- decompressedBytesWritten += 0 + valueLength
- + WritableUtils.getVIntSize(RLE_MARKER)
- + WritableUtils.getVIntSize(valueLength);
- } else {
- WritableUtils.writeVInt(out, keyLength);
- WritableUtils.writeVInt(out, valueLength);
- out.write(key.getData(), key.getPosition(), keyLength);
- out.write(value.getData(), value.getPosition(), valueLength);
-
- // Update bytes written
- decompressedBytesWritten += keyLength + valueLength
- + WritableUtils.getVIntSize(keyLength)
- + WritableUtils.getVIntSize(valueLength);
-
- BufferUtils.copy(key, previous);
- }
- ++numRecordsWritten;
- }
-
- // Required for mark/reset
- public DataOutputStream getOutputStream () {
- return out;
- }
-
- // Required for mark/reset
- public void updateCountersForExternalAppend(long length) {
- ++numRecordsWritten;
- decompressedBytesWritten += length;
- }
-
- public long getRawLength() {
- return decompressedBytesWritten;
- }
-
- public long getCompressedLength() {
- return compressedBytesWritten;
- }
-
- public void setRLE(boolean rle) {
- this.rle = rle;
- previous.reset();
- }
-
- }
-
- /**
- * <code>IFile.Reader</code> to read intermediate map-outputs.
- */
- @InterfaceAudience.Private
- @InterfaceStability.Unstable
- public static class Reader {
-
- public enum KeyState {NO_KEY, NEW_KEY, SAME_KEY};
-
- private static final int DEFAULT_BUFFER_SIZE = 128*1024;
-
- // Count records read from disk
- private long numRecordsRead = 0;
- private final TezCounter readRecordsCounter;
-
- final InputStream in; // Possibly decompressed stream that we read
- Decompressor decompressor;
- public long bytesRead = 0;
- protected final long fileLength;
- protected boolean eof = false;
- final IFileInputStream checksumIn;
-
- protected byte[] buffer = null;
- protected int bufferSize = DEFAULT_BUFFER_SIZE;
- protected DataInputStream dataIn;
-
- protected int recNo = 1;
- protected int prevKeyLength;
- protected int currentKeyLength;
- protected int currentValueLength;
- byte keyBytes[] = new byte[0];
-
-
- /**
- * Construct an IFile Reader.
- *
- * @param conf Configuration File
- * @param fs FileSystem
- * @param file Path of the file to be opened. This file should have
- * checksum bytes for the data at the end of the file.
- * @param codec codec
- * @param readsCounter Counter for records read from disk
- * @throws IOException
- */
- public Reader(Configuration conf, FileSystem fs, Path file,
- CompressionCodec codec,
- TezCounter readsCounter) throws IOException {
- this(conf, fs.open(file),
- fs.getFileStatus(file).getLen(),
- codec, readsCounter);
- }
-
- /**
- * Construct an IFile Reader.
- *
- * @param conf Configuration File
- * @param in The input stream
- * @param length Length of the data in the stream, including the checksum
- * bytes.
- * @param codec codec
- * @param readsCounter Counter for records read from disk
- * @throws IOException
- */
- public Reader(Configuration conf, InputStream in, long length,
- CompressionCodec codec,
- TezCounter readsCounter) throws IOException {
- readRecordsCounter = readsCounter;
- checksumIn = new IFileInputStream(in,length, conf);
- if (codec != null) {
- decompressor = CodecPool.getDecompressor(codec);
- if (decompressor != null) {
- this.in = codec.createInputStream(checksumIn, decompressor);
- } else {
- LOG.warn("Could not obtain decompressor from CodecPool");
- this.in = checksumIn;
- }
- } else {
- this.in = checksumIn;
- }
- this.dataIn = new DataInputStream(this.in);
- this.fileLength = length;
-
- if (conf != null) {
- bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
- }
- }
-
- public long getLength() {
- return fileLength - checksumIn.getSize();
- }
-
- public long getPosition() throws IOException {
- return checksumIn.getPosition();
- }
-
- /**
- * Read upto len bytes into buf starting at offset off.
- *
- * @param buf buffer
- * @param off offset
- * @param len length of buffer
- * @return the no. of bytes read
- * @throws IOException
- */
- private int readData(byte[] buf, int off, int len) throws IOException {
- int bytesRead = 0;
- while (bytesRead < len) {
- int n = IOUtils.wrappedReadForCompressedData(in, buf, off + bytesRead,
- len - bytesRead);
- if (n < 0) {
- return bytesRead;
- }
- bytesRead += n;
- }
- return len;
- }
-
- protected boolean positionToNextRecord(DataInput dIn) throws IOException {
- // Sanity check
- if (eof) {
- throw new EOFException("Completed reading " + bytesRead);
- }
-
- // Read key and value lengths
- prevKeyLength = currentKeyLength;
- currentKeyLength = WritableUtils.readVInt(dIn);
- currentValueLength = WritableUtils.readVInt(dIn);
- bytesRead += WritableUtils.getVIntSize(currentKeyLength) +
- WritableUtils.getVIntSize(currentValueLength);
-
- // Check for EOF
- if (currentKeyLength == EOF_MARKER && currentValueLength == EOF_MARKER) {
- eof = true;
- return false;
- }
-
- // Sanity check
- if (currentKeyLength != RLE_MARKER && currentKeyLength < 0) {
- throw new IOException("Rec# " + recNo + ": Negative key-length: " +
- currentKeyLength);
- }
- if (currentValueLength < 0) {
- throw new IOException("Rec# " + recNo + ": Negative value-length: " +
- currentValueLength);
- }
-
- return true;
- }
-
- public boolean nextRawKey(DataInputBuffer key) throws IOException {
- return readRawKey(key) != KeyState.NO_KEY;
- }
-
- public KeyState readRawKey(DataInputBuffer key) throws IOException {
- if (!positionToNextRecord(dataIn)) {
- return KeyState.NO_KEY;
- }
- if(currentKeyLength == RLE_MARKER) {
- currentKeyLength = prevKeyLength;
- // no data to read
- key.reset(keyBytes, currentKeyLength);
- return KeyState.SAME_KEY;
- }
- if (keyBytes.length < currentKeyLength) {
- keyBytes = new byte[currentKeyLength << 1];
- }
- int i = readData(keyBytes, 0, currentKeyLength);
- if (i != currentKeyLength) {
- throw new IOException ("Asked for " + currentKeyLength + " Got: " + i);
- }
- key.reset(keyBytes, currentKeyLength);
- bytesRead += currentKeyLength;
- return KeyState.NEW_KEY;
- }
-
- public void nextRawValue(DataInputBuffer value) throws IOException {
- final byte[] valBytes =
- ((value.getData().length < currentValueLength) || (value.getData() == keyBytes))
- ? new byte[currentValueLength << 1]
- : value.getData();
- int i = readData(valBytes, 0, currentValueLength);
- if (i != currentValueLength) {
- throw new IOException ("Asked for " + currentValueLength + " Got: " + i);
- }
- value.reset(valBytes, currentValueLength);
-
- // Record the bytes read
- bytesRead += currentValueLength;
-
- ++recNo;
- ++numRecordsRead;
- }
-
- public void close() throws IOException {
- // Close the underlying stream
- in.close();
-
- // Release the buffer
- dataIn = null;
- buffer = null;
- if(readRecordsCounter != null) {
- readRecordsCounter.increment(numRecordsRead);
- }
-
- // Return the decompressor
- if (decompressor != null) {
- decompressor.reset();
- CodecPool.returnDecompressor(decompressor);
- decompressor = null;
- }
- }
-
- public void reset(int offset) {
- return;
- }
-
- public void disableChecksumValidation() {
- checksumIn.disableChecksumValidation();
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileInputStream.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileInputStream.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileInputStream.java
deleted file mode 100644
index dfb69f1..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileInputStream.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.sort.impl;
-
-import java.io.EOFException;
-import java.io.FileDescriptor;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.fs.HasFileDescriptor;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.ReadaheadPool;
-import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.tez.common.TezJobConfig;
-/**
- * A checksum input stream, used for IFiles.
- * Used to validate the checksum of files created by {@link IFileOutputStream}.
-*/
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class IFileInputStream extends InputStream {
-
- private final InputStream in; //The input stream to be verified for checksum.
- private final FileDescriptor inFd; // the file descriptor, if it is known
- private final long length; //The total length of the input file
- private final long dataLength;
- private DataChecksum sum;
- private long currentOffset = 0;
- private final byte b[] = new byte[1];
- private byte csum[] = null;
- private int checksumSize;
- private byte[] buffer;
- private int offset;
-
- private ReadaheadRequest curReadahead = null;
- private ReadaheadPool raPool = ReadaheadPool.getInstance();
- private boolean readahead;
- private int readaheadLength;
-
- public static final Log LOG = LogFactory.getLog(IFileInputStream.class);
-
- private boolean disableChecksumValidation = false;
-
- /**
- * Create a checksum input stream that reads
- * @param in The input stream to be verified for checksum.
- * @param len The length of the input stream including checksum bytes.
- */
- public IFileInputStream(InputStream in, long len, Configuration conf) {
- this.in = in;
- this.inFd = getFileDescriptorIfAvail(in);
- sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
- Integer.MAX_VALUE);
- checksumSize = sum.getChecksumSize();
- buffer = new byte[4096];
- offset = 0;
- length = len;
- dataLength = length - checksumSize;
-
- conf = (conf != null) ? conf : new Configuration();
- readahead = conf.getBoolean(TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD,
- TezJobConfig.DEFAULT_TEZ_ENGINE_IFILE_READAHEAD);
- readaheadLength = conf.getInt(TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD_BYTES,
- TezJobConfig.DEFAULT_TEZ_ENGINE_IFILE_READAHEAD_BYTES);
-
- doReadahead();
- }
-
- private static FileDescriptor getFileDescriptorIfAvail(InputStream in) {
- FileDescriptor fd = null;
- try {
- if (in instanceof HasFileDescriptor) {
- fd = ((HasFileDescriptor)in).getFileDescriptor();
- } else if (in instanceof FileInputStream) {
- fd = ((FileInputStream)in).getFD();
- }
- } catch (IOException e) {
- LOG.info("Unable to determine FileDescriptor", e);
- }
- return fd;
- }
-
- /**
- * Close the input stream. Note that we need to read to the end of the
- * stream to validate the checksum.
- */
- @Override
- public void close() throws IOException {
-
- if (curReadahead != null) {
- curReadahead.cancel();
- }
- if (currentOffset < dataLength) {
- byte[] t = new byte[Math.min((int)
- (Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)];
- while (currentOffset < dataLength) {
- int n = read(t, 0, t.length);
- if (0 == n) {
- throw new EOFException("Could not validate checksum");
- }
- }
- }
- in.close();
- }
-
- @Override
- public long skip(long n) throws IOException {
- throw new IOException("Skip not supported for IFileInputStream");
- }
-
- public long getPosition() {
- return (currentOffset >= dataLength) ? dataLength : currentOffset;
- }
-
- public long getSize() {
- return checksumSize;
- }
-
- private void checksum(byte[] b, int off, int len) {
- if(len >= buffer.length) {
- sum.update(buffer, 0, offset);
- offset = 0;
- sum.update(b, off, len);
- return;
- }
- final int remaining = buffer.length - offset;
- if(len > remaining) {
- sum.update(buffer, 0, offset);
- offset = 0;
- }
- /* now we should have len < buffer.length */
- System.arraycopy(b, off, buffer, offset, len);
- offset += len;
- }
-
- /**
- * Read bytes from the stream.
- * At EOF, checksum is validated, but the checksum
- * bytes are not passed back in the buffer.
- */
- public int read(byte[] b, int off, int len) throws IOException {
-
- if (currentOffset >= dataLength) {
- return -1;
- }
-
- doReadahead();
-
- return doRead(b,off,len);
- }
-
- private void doReadahead() {
- if (raPool != null && inFd != null && readahead) {
- curReadahead = raPool.readaheadStream(
- "ifile", inFd,
- currentOffset, readaheadLength, dataLength,
- curReadahead);
- }
- }
-
- /**
- * Read bytes from the stream.
- * At EOF, checksum is validated and sent back
- * as the last four bytes of the buffer. The caller should handle
- * these bytes appropriately
- */
- public int readWithChecksum(byte[] b, int off, int len) throws IOException {
-
- if (currentOffset == length) {
- return -1;
- }
- else if (currentOffset >= dataLength) {
- // If the previous read drained off all the data, then just return
- // the checksum now. Note that checksum validation would have
- // happened in the earlier read
- int lenToCopy = (int) (checksumSize - (currentOffset - dataLength));
- if (len < lenToCopy) {
- lenToCopy = len;
- }
- System.arraycopy(csum, (int) (currentOffset - dataLength), b, off,
- lenToCopy);
- currentOffset += lenToCopy;
- return lenToCopy;
- }
-
- int bytesRead = doRead(b,off,len);
-
- if (currentOffset == dataLength) {
- if (len >= bytesRead + checksumSize) {
- System.arraycopy(csum, 0, b, off + bytesRead, checksumSize);
- bytesRead += checksumSize;
- currentOffset += checksumSize;
- }
- }
- return bytesRead;
- }
-
- private int doRead(byte[]b, int off, int len) throws IOException {
-
- // If we are trying to read past the end of data, just read
- // the left over data
- if (currentOffset + len > dataLength) {
- len = (int) dataLength - (int)currentOffset;
- }
-
- int bytesRead = in.read(b, off, len);
-
- if (bytesRead < 0) {
- throw new ChecksumException("Checksum Error", 0);
- }
-
- checksum(b, off, bytesRead);
-
- currentOffset += bytesRead;
-
- if (disableChecksumValidation) {
- return bytesRead;
- }
-
- if (currentOffset == dataLength) {
- // The last four bytes are checksum. Strip them and verify
- sum.update(buffer, 0, offset);
- csum = new byte[checksumSize];
- IOUtils.readFully(in, csum, 0, checksumSize);
- if (!sum.compare(csum, 0)) {
- throw new ChecksumException("Checksum Error", 0);
- }
- }
- return bytesRead;
- }
-
-
- @Override
- public int read() throws IOException {
- b[0] = 0;
- int l = read(b,0,1);
- if (l < 0) return l;
-
- // Upgrade the b[0] to an int so as not to misinterpret the
- // first bit of the byte as a sign bit
- int result = 0xFF & b[0];
- return result;
- }
-
- public byte[] getChecksum() {
- return csum;
- }
-
- void disableChecksumValidation() {
- disableChecksumValidation = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
deleted file mode 100644
index 3b39900..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.sort.impl;
-
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.util.DataChecksum;
-/**
- * A Checksum output stream.
- * Checksum for the contents of the file is calculated and
- * appended to the end of the file on close of the stream.
- * Used for IFiles
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class IFileOutputStream extends FilterOutputStream {
-
- /**
- * The output stream to be checksummed.
- */
- private final DataChecksum sum;
- private byte[] barray;
- private byte[] buffer;
- private int offset;
- private boolean closed = false;
- private boolean finished = false;
-
- /**
- * Create a checksum output stream that writes
- * the bytes to the given stream.
- * @param out
- */
- public IFileOutputStream(OutputStream out) {
- super(out);
- sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
- Integer.MAX_VALUE);
- barray = new byte[sum.getChecksumSize()];
- buffer = new byte[4096];
- offset = 0;
- }
-
- @Override
- public void close() throws IOException {
- if (closed) {
- return;
- }
- closed = true;
- finish();
- out.close();
- }
-
- /**
- * Finishes writing data to the output stream, by writing
- * the checksum bytes to the end. The underlying stream is not closed.
- * @throws IOException
- */
- public void finish() throws IOException {
- if (finished) {
- return;
- }
- finished = true;
- sum.update(buffer, 0, offset);
- sum.writeValue(barray, 0, false);
- out.write (barray, 0, sum.getChecksumSize());
- out.flush();
- }
-
- private void checksum(byte[] b, int off, int len) {
- if(len >= buffer.length) {
- sum.update(buffer, 0, offset);
- offset = 0;
- sum.update(b, off, len);
- return;
- }
- final int remaining = buffer.length - offset;
- if(len > remaining) {
- sum.update(buffer, 0, offset);
- offset = 0;
- }
- /*
- // FIXME if needed re-enable this in debug mode
- if (LOG.isDebugEnabled()) {
- LOG.debug("XXX checksum" +
- " b=" + b + " off=" + off +
- " buffer=" + " offset=" + offset +
- " len=" + len);
- }
- */
- /* now we should have len < buffer.length */
- System.arraycopy(b, off, buffer, offset, len);
- offset += len;
- }
-
- /**
- * Write bytes to the stream.
- */
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- checksum(b, off, len);
- out.write(b,off,len);
- }
-
- @Override
- public void write(int b) throws IOException {
- barray[0] = (byte) (b & 0xFF);
- write(barray,0,1);
- }
-
-}