You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/12 02:45:20 UTC
git commit: TEZ-418. Change SimpleInput to work with the new engine
APIs (part of TEZ-398). (sseth)
Updated Branches:
refs/heads/TEZ-398 c86e0e40d -> 7974742bf
TEZ-418. Change SimpleInput to work with the new engine APIs (part of
TEZ-398). (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/7974742b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/7974742b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/7974742b
Branch: refs/heads/TEZ-398
Commit: 7974742bf50eb3427b3b2076c63f0f0714fba8ad
Parents: c86e0e4
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Sep 11 17:44:47 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Sep 11 17:44:47 2013 -0700
----------------------------------------------------------------------
.../engine/lib/input/ShuffledMergedInput.java | 2 +-
.../org/apache/tez/engine/newapi/KVReader.java | 4 +-
.../org/apache/tez/mapreduce/common/Utils.java | 47 +++
.../mapreduce/hadoop/newmapred/MRReporter.java | 70 ++++
.../newmapreduce/TaskAttemptContextImpl.java | 86 ++++
.../tez/mapreduce/newinput/SimpleInput.java | 417 +++++++++++++++++++
6 files changed, 623 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7974742b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
index fa7054a..5d67b0c 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
@@ -130,7 +130,7 @@ public class ShuffledMergedInput implements LogicalInput {
return new KVReader() {
@Override
- public boolean moveToNext() throws IOException {
+ public boolean next() throws IOException {
return vIter.moveToNext();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7974742b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
index bd0e933..b74c4eb 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
@@ -25,7 +25,7 @@ import java.io.IOException;
*
* Example usage
* <code>
- * while (kvReader.moveToNext()) {
+ * while (kvReader.next()) {
* KVRecord kvRecord = getCurrentKV();
* Object key = kvRecord.getKey();
* Iterable values = kvRecord.getValues();
@@ -41,7 +41,7 @@ public interface KVReader extends Reader {
* @throws IOException
* if an error occurs
*/
- public boolean moveToNext() throws IOException;
+ public boolean next() throws IOException;
/**
* Return the current key/value(s) pair. Use moveToNext() to advance.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7974742b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java
new file mode 100644
index 0000000..f7cd6f0
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java
@@ -0,0 +1,47 @@
+package org.apache.tez.mapreduce.common;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
+
+import com.google.common.base.Preconditions;
+
+public class Utils {
+
+ /**
+ * Gets a handle to the Statistics instance based on the scheme associated
+ * with path.
+ *
+ * @param path the path.
+ * @param conf the configuration to extract the scheme from if not part of
+ * the path.
+ * @return a Statistics instance, or null if none is found for the scheme.
+ */
+ @Private
+ public static List<Statistics> getFsStatistics(Path path, Configuration conf) throws IOException {
+ List<Statistics> matchedStats = new ArrayList<FileSystem.Statistics>();
+ path = path.getFileSystem(conf).makeQualified(path);
+ String scheme = path.toUri().getScheme();
+ for (Statistics stats : FileSystem.getAllStatistics()) {
+ if (stats.getScheme().equals(scheme)) {
+ matchedStats.add(stats);
+ }
+ }
+ return matchedStats;
+ }
+
+ public static Counter getMRCounter(TezCounter tezCounter) {
+ Preconditions.checkNotNull(tezCounter);
+ return new MRCounters.MRCounter(tezCounter);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7974742b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapred/MRReporter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapred/MRReporter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapred/MRReporter.java
new file mode 100644
index 0000000..4638ab3
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapred/MRReporter.java
@@ -0,0 +1,70 @@
+package org.apache.tez.mapreduce.hadoop.newmapred;
+
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.tez.engine.newapi.TezTaskContext;
+import org.apache.tez.mapreduce.common.Utils;
+
+public class MRReporter implements Reporter {
+
+ private TezTaskContext tezTaskContext;
+ private InputSplit split;
+
+ public MRReporter(TezTaskContext tezTaskContext) {
+ this(tezTaskContext, null);
+ }
+
+ public MRReporter(TezTaskContext tezTaskContext, InputSplit split) {
+ this.tezTaskContext = tezTaskContext;
+ this.split = split;
+ }
+
+ @Override
+ public void progress() {
+ // Not reporting progress in Tez.
+ }
+
+ @Override
+ public void setStatus(String status) {
+ // Not setting status string in Tez.
+
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> name) {
+ return Utils.getMRCounter(tezTaskContext.getCounters().findCounter(name));
+ }
+
+ @Override
+ public Counter getCounter(String group, String name) {
+ return Utils.getMRCounter(tezTaskContext.getCounters().findCounter(group,
+ name));
+ }
+
+ @Override
+ public void incrCounter(Enum<?> key, long amount) {
+ getCounter(key).increment(amount);
+ }
+
+ @Override
+ public void incrCounter(String group, String counter, long amount) {
+ getCounter(group, counter).increment(amount);
+ }
+
+ @Override
+ public InputSplit getInputSplit() throws UnsupportedOperationException {
+ if (split == null) {
+ throw new UnsupportedOperationException("Input only available on map");
+ } else {
+ return split;
+ }
+ }
+
+ @Override
+ public float getProgress() {
+ // TOOD NEWTEZ Does this make a difference to anything ?
+ return 0.0f;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7974742b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
new file mode 100644
index 0000000..5b70b31
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
@@ -0,0 +1,86 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.newmapreduce;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.mapreduce.common.Utils;
+
+// NOTE: NEWTEZ: This is a copy of org.apache.tez.mapreduce.hadoop.mapred (not mapreduce). mapred likely does not need it's own copy of this class.
+// Meant for use by the "mapreduce" API
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TaskAttemptContextImpl
+ extends org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl {
+ private TezInputContext inputContext;
+
+ // FIXME we need to use DAG Id but we are using App Id
+ public TaskAttemptContextImpl(Configuration conf, TezInputContext inputContext) {
+ // TODO NEWTEZ Figure out how to compute the TaskType - MAP or REDUCE. For
+ // SimpleInput, it likely doesn't matter - but setting it to MAP
+ // TODO NEWTEZ Can the jt Identifier string be inputContext.getUniqueId ?
+ super(conf, new TaskAttemptID(
+ new TaskID(String.valueOf(inputContext.getApplicationId()
+ .getClusterTimestamp()), inputContext.getApplicationId().getId(),
+ TaskType.MAP, inputContext.getTaskIndex()),
+ inputContext.getAttemptNumber()));
+ this.inputContext = inputContext;
+
+ }
+
+ @Override
+ public float getProgress() {
+ // TODO NEWTEZ Will this break anything ?
+ return 0.0f;
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> counterName) {
+ return Utils.getMRCounter(inputContext.getCounters().findCounter(counterName));
+ }
+
+ @Override
+ public Counter getCounter(String groupName, String counterName) {
+ return Utils.getMRCounter(inputContext.getCounters().findCounter(groupName, counterName));
+ }
+
+ /**
+ * Report progress.
+ */
+ @Override
+ public void progress() {
+ // Nothing to do.
+ }
+
+ /**
+ * Set the current status of the task to the given string.
+ */
+ @Override
+ public void setStatus(String status) {
+ setStatusString(status);
+ // Nothing to do until InputContext supports some kind of custom string
+ // diagnostics.
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7974742b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
new file mode 100644
index 0000000..616ce35
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.newinput;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.KVReader;
+import org.apache.tez.engine.newapi.LogicalInput;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.mapreduce.common.Utils;
+import org.apache.tez.mapreduce.hadoop.newmapred.MRReporter;
+import org.apache.tez.mapreduce.hadoop.newmapreduce.TaskAttemptContextImpl;
+
+/**
+ * {@link SimpleInput} is an {@link Input} which provides key/values pairs
+ * for the consumer.
+ *
+ * It is compatible with all standard Apache Hadoop MapReduce
+ * {@link InputFormat} implementations.
+ */
+
+public class SimpleInput implements LogicalInput {
+
+ private static final Log LOG = LogFactory.getLog(SimpleInput.class);
+
+
+ private TezInputContext inputContext;
+
+ private JobConf jobConf;
+ private Configuration incrementalConf;
+
+ boolean useNewApi;
+
+ org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext;
+
+ @SuppressWarnings("rawtypes")
+ private org.apache.hadoop.mapreduce.InputFormat newInputFormat;
+ @SuppressWarnings("rawtypes")
+ private org.apache.hadoop.mapreduce.RecordReader newRecordReader;
+ private org.apache.hadoop.mapreduce.InputSplit newInputSplit;
+
+ @SuppressWarnings("rawtypes")
+ private InputFormat oldInputFormat;
+ @SuppressWarnings("rawtypes")
+ private RecordReader oldRecordReader;
+
+ protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
+
+ // Setup the values iterator once, and set value on the same object each time
+ // to prevent lots of objects being created.
+ private SimpleValueIterator valueIterator = new SimpleValueIterator();
+ private SimpleIterable valueIterable = new SimpleIterable(valueIterator);
+
+ private TezCounter inputRecordCounter;
+ private TezCounter fileInputByteCounter;
+ private List<Statistics> fsStats;
+
+ @Override
+ public List<Event> initialize(TezInputContext inputContext) throws IOException {
+ Configuration conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
+ this.jobConf = new JobConf(conf);
+
+ // Read split information.
+ TaskSplitMetaInfo[] allMetaInfo = readSplits(conf);
+ TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[inputContext.getTaskIndex()];
+ this.splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
+ thisTaskMetaInfo.getStartOffset());
+
+ // TODO NEWTEZ Rename this to be specific to SimpleInput. This Input, in
+ // theory, can be used by the MapProcessor, ReduceProcessor or a custom
+ // processor. (The processor could provide the counter though)
+ this.inputRecordCounter = inputContext.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS);
+ this.fileInputByteCounter = inputContext.getCounters().findCounter(FileInputFormatCounter.BYTES_READ);
+
+ useNewApi = this.jobConf.getUseNewMapper();
+
+ if (useNewApi) {
+ TaskAttemptContext taskAttemptContext = createTaskAttemptContext();
+ Class<? extends org.apache.hadoop.mapreduce.InputFormat<?, ?>> inputFormatClazz;
+ try {
+ inputFormatClazz = taskAttemptContext.getInputFormatClass();
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Unable to instantiate InputFormat class", e);
+ }
+
+ newInputFormat = ReflectionUtils.newInstance(inputFormatClazz, this.jobConf);
+
+ newInputSplit = getNewSplitDetails(splitMetaInfo);
+
+ List<Statistics> matchedStats = null;
+ if (newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
+ matchedStats = Utils.getFsStatistics(
+ ((org.apache.hadoop.mapreduce.lib.input.FileSplit)
+ newInputSplit).getPath(), this.jobConf);
+ }
+ fsStats = matchedStats;
+
+ try {
+ newRecordReader = newInputFormat.createRecordReader(newInputSplit, taskAttemptContext);
+ newRecordReader.initialize(newInputSplit, taskAttemptContext);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while creating record reader", e);
+ }
+ } else { // OLD API
+ oldInputFormat = this.jobConf.getInputFormat();
+ InputSplit oldInputSplit =
+ getOldSplitDetails(splitMetaInfo);
+
+
+ List<Statistics> matchedStats = null;
+ if (oldInputSplit instanceof FileSplit) {
+ matchedStats = Utils.getFsStatistics(((FileSplit) oldInputSplit).getPath(), this.jobConf);
+ }
+ fsStats = matchedStats;
+
+ long bytesInPrev = getInputBytes();
+ oldRecordReader = oldInputFormat.getRecordReader(oldInputSplit,
+ this.jobConf, new MRReporter(inputContext, oldInputSplit));
+ long bytesInCurr = getInputBytes();
+ fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+ setIncrementalConfigParams(oldInputSplit);
+ }
+ return null;
+ }
+
+ @Override
+ public KVReader getReader() throws IOException {
+ return new KVReader() {
+
+ Object key;
+ Object value;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public boolean next() throws IOException {
+ boolean hasNext = false;
+ long bytesInPrev = getInputBytes();
+ if (useNewApi) {
+ try {
+ hasNext = newRecordReader.nextKeyValue();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while checking for next key-value", e);
+ }
+ } else {
+ hasNext = oldRecordReader.next(key, value);
+ }
+ long bytesInCurr = getInputBytes();
+ fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+
+ if (hasNext) {
+ inputRecordCounter.increment(1);
+ }
+
+ return hasNext;
+ }
+
+ @Override
+ public KVRecord getCurrentKV() throws IOException {
+ KVRecord kvRecord = null;
+ if (useNewApi) {
+ try {
+ valueIterator.setValue(newRecordReader.getCurrentValue());
+ kvRecord = new KVRecord(newRecordReader.getCurrentKey(), valueIterable);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while fetching next key-value", e);
+ }
+
+ } else {
+ valueIterator.setValue(value);
+ kvRecord = new KVRecord(key, valueIterable);
+ }
+ return kvRecord;
+ }
+ };
+ }
+
+
+ @Override
+ public void handleEvents(List<Event> inputEvents) {
+ // Not expecting any events at the moment.
+ }
+
+
+ @Override
+ public void setNumPhysicalInputs(int numInputs) {
+ // Not required at the moment. May be required if splits are sent via events.
+ }
+
+ public List<Event> close() throws IOException {
+ long bytesInPrev = getInputBytes();
+ if (useNewApi) {
+ newRecordReader.close();
+ } else {
+ oldRecordReader.close();
+ }
+ long bytesInCurr = getInputBytes();
+ fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+
+ return null;
+ }
+
+ /**
+ * SimpleInputs sets some additional parameters like split location when using
+ * the new API. This methods returns the list of additional updates, and
+ * should be used by Processors using the old MapReduce API with SimpleInput.
+ *
+ * @return the additional fields set by SimpleInput
+ */
+ public Configuration getConfigUpdates() {
+ return new Configuration(incrementalConf);
+ }
+
+ public float getProgress() throws IOException, InterruptedException {
+ if (useNewApi) {
+ return newRecordReader.getProgress();
+ } else {
+ return oldRecordReader.getProgress();
+ }
+ }
+
+
+ private TaskAttemptContext createTaskAttemptContext() {
+ return new TaskAttemptContextImpl(this.jobConf, inputContext);
+ }
+
+
+ private static class SimpleValueIterator implements Iterator<Object> {
+
+ private Object value;
+ int nextCount = 0;
+
+ public void setValue(Object value) {
+ this.value = value;
+ }
+
+ public boolean hasNext() {
+ return nextCount == 0;
+ }
+
+ public Object next() {
+ nextCount++;
+ Object value = this.value;
+ this.value = null;
+ return value;
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static class SimpleIterable implements Iterable<Object> {
+ private final Iterator<Object> iterator;
+ public SimpleIterable(Iterator<Object> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public Iterator<Object> iterator() {
+ return iterator;
+ }
+ }
+
+
+
+
+ @SuppressWarnings("unchecked")
+ private InputSplit getOldSplitDetails(TaskSplitIndex splitMetaInfo)
+ throws IOException {
+ Path file = new Path(splitMetaInfo.getSplitLocation());
+ FileSystem fs = FileSystem.getLocal(jobConf);
+ file = fs.makeQualified(file);
+ LOG.info("Reading input split file from : " + file);
+ long offset = splitMetaInfo.getStartOffset();
+
+ FSDataInputStream inFile = fs.open(file);
+ inFile.seek(offset);
+ String className = Text.readString(inFile);
+ Class<org.apache.hadoop.mapred.InputSplit> cls;
+ try {
+ cls =
+ (Class<org.apache.hadoop.mapred.InputSplit>)
+ jobConf.getClassByName(className);
+ } catch (ClassNotFoundException ce) {
+ IOException wrap = new IOException("Split class " + className +
+ " not found");
+ wrap.initCause(ce);
+ throw wrap;
+ }
+ SerializationFactory factory = new SerializationFactory(jobConf);
+ Deserializer<org.apache.hadoop.mapred.InputSplit> deserializer =
+ (Deserializer<org.apache.hadoop.mapred.InputSplit>)
+ factory.getDeserializer(cls);
+ deserializer.open(inFile);
+ org.apache.hadoop.mapred.InputSplit split = deserializer.deserialize(null);
+ long pos = inFile.getPos();
+ inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
+ .increment(pos - offset);
+ inFile.close();
+ return split;
+ }
+
+ @SuppressWarnings("unchecked")
+ private org.apache.hadoop.mapreduce.InputSplit getNewSplitDetails(
+ TaskSplitIndex splitMetaInfo) throws IOException {
+ Path file = new Path(splitMetaInfo.getSplitLocation());
+ long offset = splitMetaInfo.getStartOffset();
+
+ // Split information read from local filesystem.
+ FileSystem fs = FileSystem.getLocal(jobConf);
+ file = fs.makeQualified(file);
+ LOG.info("Reading input split file from : " + file);
+ FSDataInputStream inFile = fs.open(file);
+ inFile.seek(offset);
+ String className = Text.readString(inFile);
+ Class<org.apache.hadoop.mapreduce.InputSplit> cls;
+ try {
+ cls =
+ (Class<org.apache.hadoop.mapreduce.InputSplit>)
+ jobConf.getClassByName(className);
+ } catch (ClassNotFoundException ce) {
+ IOException wrap = new IOException("Split class " + className +
+ " not found");
+ wrap.initCause(ce);
+ throw wrap;
+ }
+ SerializationFactory factory = new SerializationFactory(jobConf);
+ Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer =
+ (Deserializer<org.apache.hadoop.mapreduce.InputSplit>)
+ factory.getDeserializer(cls);
+ deserializer.open(inFile);
+ org.apache.hadoop.mapreduce.InputSplit split =
+ deserializer.deserialize(null);
+ long pos = inFile.getPos();
+ inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
+ .increment(pos - offset);
+ inFile.close();
+ return split;
+ }
+
+ private void setIncrementalConfigParams(InputSplit inputSplit) {
+ if (inputSplit instanceof FileSplit) {
+ FileSplit fileSplit = (FileSplit) inputSplit;
+ this.incrementalConf = new Configuration(false);
+
+ this.incrementalConf.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath()
+ .toString());
+ this.incrementalConf.setLong(JobContext.MAP_INPUT_START,
+ fileSplit.getStart());
+ this.incrementalConf.setLong(JobContext.MAP_INPUT_PATH,
+ fileSplit.getLength());
+ }
+ LOG.info("Processing split: " + inputSplit);
+ }
+
+ private long getInputBytes() {
+ if (fsStats == null) return 0;
+ long bytesRead = 0;
+ for (Statistics stat: fsStats) {
+ bytesRead = bytesRead + stat.getBytesRead();
+ }
+ return bytesRead;
+ }
+
+ protected TaskSplitMetaInfo[] readSplits(Configuration conf)
+ throws IOException {
+ TaskSplitMetaInfo[] allTaskSplitMetaInfo;
+ allTaskSplitMetaInfo = SplitMetaInfoReaderTez.readSplitMetaInfo(conf,
+ FileSystem.getLocal(conf));
+ return allTaskSplitMetaInfo;
+ }
+}