You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:50 UTC
[43/50] [abbrv] Rename tez-engine-api to tez-runtime-api and
tez-engine is split into 2: - tez-engine-library for user-visible
Input/Output/Processor implementations - tez-engine-internals for framework
internals
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java b/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
deleted file mode 100644
index b7867aa..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Iterates values while keys match in sorted input.
- *
- * This class is not thread safe. Accessing methods from multiple threads will
- * lead to corrupt data.
- *
- */
-public class ValuesIterator<KEY,VALUE> {
- protected TezRawKeyValueIterator in; //input iterator
- private KEY key; // current key
- private KEY nextKey;
- private VALUE value; // current value
- //private boolean hasNext; // more w/ this key
- private boolean more; // more in file
- private RawComparator<KEY> comparator;
- private Deserializer<KEY> keyDeserializer;
- private Deserializer<VALUE> valDeserializer;
- private DataInputBuffer keyIn = new DataInputBuffer();
- private DataInputBuffer valueIn = new DataInputBuffer();
- private TezCounter inputKeyCounter;
- private TezCounter inputValueCounter;
-
- private int keyCtr = 0;
- private boolean hasMoreValues; // For the current key.
- private boolean isFirstRecord = true;
-
- public ValuesIterator (TezRawKeyValueIterator in,
- RawComparator<KEY> comparator,
- Class<KEY> keyClass,
- Class<VALUE> valClass, Configuration conf,
- TezCounter inputKeyCounter,
- TezCounter inputValueCounter)
- throws IOException {
- this.in = in;
- this.comparator = comparator;
- this.inputKeyCounter = inputKeyCounter;
- this.inputValueCounter = inputValueCounter;
- SerializationFactory serializationFactory = new SerializationFactory(conf);
- this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
- this.keyDeserializer.open(keyIn);
- this.valDeserializer = serializationFactory.getDeserializer(valClass);
- this.valDeserializer.open(this.valueIn);
- }
-
- TezRawKeyValueIterator getRawIterator() { return in; }
-
- /**
- * Move to the next K-Vs pair
- * @return true if another pair exists, otherwise false.
- * @throws IOException
- */
- public boolean moveToNext() throws IOException {
- if (isFirstRecord) {
- readNextKey();
- key = nextKey;
- nextKey = null;
- hasMoreValues = more;
- isFirstRecord = false;
- } else {
- nextKey();
- }
- return more;
- }
-
- /** The current key. */
- public KEY getKey() {
- return key;
- }
-
- // TODO NEWTEZ Maybe add another method which returns an iterator instead of iterable
-
- public Iterable<VALUE> getValues() {
- return new Iterable<VALUE>() {
-
- @Override
- public Iterator<VALUE> iterator() {
-
- return new Iterator<VALUE>() {
-
- private final int keyNumber = keyCtr;
-
- @Override
- public boolean hasNext() {
- return hasMoreValues;
- }
-
- @Override
- public VALUE next() {
- if (!hasMoreValues) {
- throw new NoSuchElementException("iterate past last value");
- }
- Preconditions
- .checkState(
- keyNumber == keyCtr,
- "Cannot use values iterator on the previous K-V pair after moveToNext has been invoked to move to the next K-V pair");
-
- try {
- readNextValue();
- readNextKey();
- } catch (IOException ie) {
- throw new RuntimeException("problem advancing post rec#"+keyCtr, ie);
- }
- inputValueCounter.increment(1);
- return value;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Cannot remove elements");
- }
- };
- }
- };
- }
-
-
-
- /** Start processing next unique key. */
- private void nextKey() throws IOException {
- // read until we find a new key
- while (hasMoreValues) {
- readNextKey();
- }
- if (more) {
- inputKeyCounter.increment(1);
- ++keyCtr;
- }
-
- // move the next key to the current one
- KEY tmpKey = key;
- key = nextKey;
- nextKey = tmpKey;
- hasMoreValues = more;
- }
-
- /**
- * read the next key - which may be the same as the current key.
- */
- private void readNextKey() throws IOException {
- more = in.next();
- if (more) {
- DataInputBuffer nextKeyBytes = in.getKey();
- keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
- nextKey = keyDeserializer.deserialize(nextKey);
- hasMoreValues = key != null && (comparator.compare(key, nextKey) == 0);
- } else {
- hasMoreValues = false;
- }
- }
-
- /**
- * Read the next value
- * @throws IOException
- */
- private void readNextValue() throws IOException {
- DataInputBuffer nextValueBytes = in.getValue();
- valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength());
- value = valDeserializer.deserialize(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/YARNMaster.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/YARNMaster.java b/tez-engine/src/main/java/org/apache/tez/engine/common/YARNMaster.java
deleted file mode 100644
index 48ad639..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/YARNMaster.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-@Private
-@Unstable
-public class YARNMaster {
-
- public enum State {
- INITIALIZING, RUNNING;
- }
-
- public static String getMasterUserName(Configuration conf) {
- return conf.get(YarnConfiguration.RM_PRINCIPAL);
- }
-
- public static InetSocketAddress getMasterAddress(Configuration conf) {
- return conf.getSocketAddr(
- YarnConfiguration.RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_PORT);
- }
-
- public static String getMasterPrincipal(Configuration conf)
- throws IOException {
- String masterHostname = getMasterAddress(conf).getHostName();
- // get kerberos principal for use as delegation token renewer
- return SecurityUtil.getServerPrincipal(
- getMasterUserName(conf), masterHostname);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/combine/Combiner.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/combine/Combiner.java b/tez-engine/src/main/java/org/apache/tez/engine/common/combine/Combiner.java
deleted file mode 100644
index b387b36..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/combine/Combiner.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.combine;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-
-/**
- *<b>Combiner Initialization</b></p> The Combiner class is picked up
- * using the TEZ_ENGINE_COMBINER_CLASS attribute in {@link TezJobConfig}
- *
- *
- * Partitioners need to provide a single argument ({@link TezTaskContext})
- * constructor.
- */
-@Unstable
-@LimitedPrivate("mapreduce")
-public interface Combiner {
- public void combine(TezRawKeyValueIterator rawIter, Writer writer)
- throws InterruptedException, IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
deleted file mode 100644
index 546151f..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common.localshuffle;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.sort.impl.TezMerger;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-
-@SuppressWarnings({"rawtypes"})
-public class LocalShuffle {
-
- // TODO NEWTEZ This is broken.
-
- private final TezInputContext inputContext;
- private final Configuration conf;
- private final int numInputs;
-
- private final Class keyClass;
- private final Class valClass;
- private final RawComparator comparator;
-
- private final FileSystem rfs;
- private final int sortFactor;
-
- private final TezCounter spilledRecordsCounter;
- private final CompressionCodec codec;
- private final TezTaskOutput mapOutputFile;
-
- public LocalShuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
- this.inputContext = inputContext;
- this.conf = conf;
- this.numInputs = numInputs;
-
- this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
- this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
- this.comparator = ConfigUtils.getIntermediateInputKeyComparator(conf);
-
- this.sortFactor =
- conf.getInt(
- TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR,
- TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR);
-
- this.rfs = FileSystem.getLocal(conf).getRaw();
-
- this.spilledRecordsCounter = inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
-
- // compression
- if (ConfigUtils.isIntermediateInputCompressed(conf)) {
- Class<? extends CompressionCodec> codecClass =
- ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
- this.codec = ReflectionUtils.newInstance(codecClass, conf);
- } else {
- this.codec = null;
- }
-
- // Always local
- this.mapOutputFile = new TezLocalTaskOutputFiles(conf, inputContext.getUniqueIdentifier());
- }
-
-
- public TezRawKeyValueIterator run() throws IOException {
- // Copy is complete, obviously!
-
-
- // Merge
- return TezMerger.merge(conf, rfs,
- keyClass, valClass,
- codec,
- getMapFiles(),
- false,
- sortFactor,
- new Path(inputContext.getUniqueIdentifier()), // TODO NEWTEZ This is likely broken
- comparator,
- null, spilledRecordsCounter, null, null);
- }
-
- private Path[] getMapFiles()
- throws IOException {
- List<Path> fileList = new ArrayList<Path>();
- // for local jobs
- for(int i = 0; i < numInputs; ++i) {
- //fileList.add(mapOutputFile.getInputFile(i));
- }
-
- return fileList.toArray(new Path[0]);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryImpl.java
deleted file mode 100644
index 351e01c..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryImpl.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.objectregistry;
-
-import java.util.AbstractMap;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import com.google.inject.Singleton;
-
-@Singleton
-public class ObjectRegistryImpl implements ObjectRegistry {
-
- private Map<String, Map.Entry<Object, ObjectLifeCycle>> objectCache =
- new HashMap<String, Map.Entry<Object, ObjectLifeCycle>>();
-
- @Override
- public synchronized Object add(ObjectLifeCycle lifeCycle,
- String key, Object value) {
- Map.Entry<Object, ObjectLifeCycle> oldEntry =
- objectCache.put(key,
- new AbstractMap.SimpleImmutableEntry<Object, ObjectLifeCycle>(
- value, lifeCycle));
- return oldEntry != null ? oldEntry.getKey() : null;
- }
-
- @Override
- public synchronized Object get(String key) {
- Map.Entry<Object, ObjectLifeCycle> entry =
- objectCache.get(key);
- return entry != null ? entry.getKey() : null;
- }
-
- @Override
- public synchronized boolean delete(String key) {
- return (null != objectCache.remove(key));
- }
-
- public synchronized void clearCache(ObjectLifeCycle lifeCycle) {
- for (Entry<String, Entry<Object, ObjectLifeCycle>> entry :
- objectCache.entrySet()) {
- if (entry.getValue().getValue().equals(lifeCycle)) {
- objectCache.remove(entry.getKey());
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryModule.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryModule.java b/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryModule.java
deleted file mode 100644
index ab346fd..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryModule.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.objectregistry;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.inject.AbstractModule;
-
-public class ObjectRegistryModule extends AbstractModule {
-
- private final ObjectRegistry objectRegistry;
-
- public ObjectRegistryModule(ObjectRegistry objectRegistry) {
- this.objectRegistry = objectRegistry;
- }
-
- @VisibleForTesting
- public ObjectRegistryModule() {
- objectRegistry = new ObjectRegistryImpl();
- }
-
- @Override
- protected void configure() {
- bind(ObjectRegistry.class).toInstance(this.objectRegistry);
- requestStaticInjection(ObjectRegistryFactory.class);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenIdentifier.java b/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenIdentifier.java
deleted file mode 100644
index 827001b..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenIdentifier.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.security;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-
-/**
- * The token identifier for job token
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class JobTokenIdentifier extends TokenIdentifier {
- private Text jobid;
- public final static Text KIND_NAME = new Text("mapreduce.job");
-
- /**
- * Default constructor
- */
- public JobTokenIdentifier() {
- this.jobid = new Text();
- }
-
- /**
- * Create a job token identifier from a jobid
- * @param jobid the jobid to use
- */
- public JobTokenIdentifier(Text jobid) {
- this.jobid = jobid;
- }
-
- /** {@inheritDoc} */
- @Override
- public Text getKind() {
- return KIND_NAME;
- }
-
- /** {@inheritDoc} */
- @Override
- public UserGroupInformation getUser() {
- if (jobid == null || "".equals(jobid.toString())) {
- return null;
- }
- return UserGroupInformation.createRemoteUser(jobid.toString());
- }
-
- /**
- * Get the jobid
- * @return the jobid
- */
- public Text getJobId() {
- return jobid;
- }
-
- /** {@inheritDoc} */
- @Override
- public void readFields(DataInput in) throws IOException {
- jobid.readFields(in);
- }
-
- /** {@inheritDoc} */
- @Override
- public void write(DataOutput out) throws IOException {
- jobid.write(out);
- }
-
- @InterfaceAudience.Private
- public static class Renewer extends Token.TrivialRenewer {
- @Override
- protected Text getKind() {
- return KIND_NAME;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSecretManager.java b/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSecretManager.java
deleted file mode 100644
index d957b8b..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSecretManager.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.security;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-import javax.crypto.SecretKey;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.security.token.Token;
-
-/**
- * SecretManager for job token. It can be used to cache generated job tokens.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class JobTokenSecretManager extends SecretManager<JobTokenIdentifier> {
- private final SecretKey masterKey;
- private final Map<String, SecretKey> currentJobTokens;
-
- /**
- * Convert the byte[] to a secret key
- * @param key the byte[] to create the secret key from
- * @return the secret key
- */
- public static SecretKey createSecretKey(byte[] key) {
- return SecretManager.createSecretKey(key);
- }
-
- /**
- * Compute the HMAC hash of the message using the key
- * @param msg the message to hash
- * @param key the key to use
- * @return the computed hash
- */
- public static byte[] computeHash(byte[] msg, SecretKey key) {
- return createPassword(msg, key);
- }
-
- /**
- * Default constructor
- */
- public JobTokenSecretManager() {
- this.masterKey = generateSecret();
- this.currentJobTokens = new TreeMap<String, SecretKey>();
- }
-
- /**
- * Create a new password/secret for the given job token identifier.
- * @param identifier the job token identifier
- * @return token password/secret
- */
- @Override
- public byte[] createPassword(JobTokenIdentifier identifier) {
- byte[] result = createPassword(identifier.getBytes(), masterKey);
- return result;
- }
-
- /**
- * Add the job token of a job to cache
- * @param jobId the job that owns the token
- * @param token the job token
- */
- public void addTokenForJob(String jobId, Token<JobTokenIdentifier> token) {
- SecretKey tokenSecret = createSecretKey(token.getPassword());
- synchronized (currentJobTokens) {
- currentJobTokens.put(jobId, tokenSecret);
- }
- }
-
- /**
- * Remove the cached job token of a job from cache
- * @param jobId the job whose token is to be removed
- */
- public void removeTokenForJob(String jobId) {
- synchronized (currentJobTokens) {
- currentJobTokens.remove(jobId);
- }
- }
-
- /**
- * Look up the token password/secret for the given jobId.
- * @param jobId the jobId to look up
- * @return token password/secret as SecretKey
- * @throws InvalidToken
- */
- public SecretKey retrieveTokenSecret(String jobId) throws InvalidToken {
- SecretKey tokenSecret = null;
- synchronized (currentJobTokens) {
- tokenSecret = currentJobTokens.get(jobId);
- }
- if (tokenSecret == null) {
- throw new InvalidToken("Can't find job token for job " + jobId + " !!");
- }
- return tokenSecret;
- }
-
- /**
- * Look up the token password/secret for the given job token identifier.
- * @param identifier the job token identifier to look up
- * @return token password/secret as byte[]
- * @throws InvalidToken
- */
- @Override
- public byte[] retrievePassword(JobTokenIdentifier identifier)
- throws InvalidToken {
- return retrieveTokenSecret(identifier.getJobId().toString()).getEncoded();
- }
-
- /**
- * Create an empty job token identifier
- * @return a newly created empty job token identifier
- */
- @Override
- public JobTokenIdentifier createIdentifier() {
- return new JobTokenIdentifier();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSelector.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSelector.java b/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSelector.java
deleted file mode 100644
index 080b9e2..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenSelector.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.security;
-
-import java.util.Collection;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenSelector;
-
-/**
- * Look through tokens to find the first job token that matches the service
- * and return it.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class JobTokenSelector implements TokenSelector<JobTokenIdentifier> {
-
- @SuppressWarnings("unchecked")
- @Override
- public Token<JobTokenIdentifier> selectToken(Text service,
- Collection<Token<? extends TokenIdentifier>> tokens) {
- if (service == null) {
- return null;
- }
- for (Token<? extends TokenIdentifier> token : tokens) {
- if (JobTokenIdentifier.KIND_NAME.equals(token.getKind())
- && service.equals(token.getService())) {
- return (Token<JobTokenIdentifier>) token;
- }
- }
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/security/Master.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/security/Master.java b/tez-engine/src/main/java/org/apache/tez/engine/common/security/Master.java
deleted file mode 100644
index 20cad0a..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/security/Master.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.security;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-@Private
-@Unstable
-public class Master {
-
- public enum State {
- INITIALIZING, RUNNING;
- }
-
- public static String getMasterUserName(Configuration conf) {
- return conf.get(YarnConfiguration.RM_PRINCIPAL);
- }
-
- public static InetSocketAddress getMasterAddress(Configuration conf) {
- return conf
- .getSocketAddr(YarnConfiguration.RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_PORT);
- }
-
- public static String getMasterPrincipal(Configuration conf)
- throws IOException {
- String masterHostname = getMasterAddress(conf).getHostName();
- // get kerberos principal for use as delegation token renewer
- return SecurityUtil.getServerPrincipal(getMasterUserName(conf),
- masterHostname);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/security/SecureShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/security/SecureShuffleUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/security/SecureShuffleUtils.java
deleted file mode 100644
index 71d990a..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/security/SecureShuffleUtils.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.security;
-
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.net.URL;
-
-import javax.crypto.SecretKey;
-import javax.servlet.http.HttpServletRequest;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.WritableComparator;
-
-/**
- *
- * utilities for generating kyes, hashes and verifying them for shuffle
- *
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class SecureShuffleUtils {
- public static final String HTTP_HEADER_URL_HASH = "UrlHash";
- public static final String HTTP_HEADER_REPLY_URL_HASH = "ReplyHash";
-
- /**
- * Base64 encoded hash of msg
- * @param msg
- */
- public static String generateHash(byte[] msg, SecretKey key) {
- return new String(Base64.encodeBase64(generateByteHash(msg, key)));
- }
-
- /**
- * calculate hash of msg
- * @param msg
- * @return
- */
- private static byte[] generateByteHash(byte[] msg, SecretKey key) {
- return JobTokenSecretManager.computeHash(msg, key);
- }
-
- /**
- * verify that hash equals to HMacHash(msg)
- * @param newHash
- * @return true if is the same
- */
- private static boolean verifyHash(byte[] hash, byte[] msg, SecretKey key) {
- byte[] msg_hash = generateByteHash(msg, key);
- return WritableComparator.compareBytes(msg_hash, 0, msg_hash.length, hash, 0, hash.length) == 0;
- }
-
- /**
- * Aux util to calculate hash of a String
- * @param enc_str
- * @param key
- * @return Base64 encodedHash
- * @throws IOException
- */
- public static String hashFromString(String enc_str, SecretKey key)
- throws IOException {
- return generateHash(enc_str.getBytes(), key);
- }
-
- /**
- * verify that base64Hash is same as HMacHash(msg)
- * @param base64Hash (Base64 encoded hash)
- * @param msg
- * @throws IOException if not the same
- */
- public static void verifyReply(String base64Hash, String msg, SecretKey key)
- throws IOException {
- byte[] hash = Base64.decodeBase64(base64Hash.getBytes());
-
- boolean res = verifyHash(hash, msg.getBytes(), key);
-
- if(res != true) {
- throw new IOException("Verification of the hashReply failed");
- }
- }
-
- /**
- * Shuffle specific utils - build string for encoding from URL
- * @param url
- * @return string for encoding
- */
- public static String buildMsgFrom(URL url) {
- return buildMsgFrom(url.getPath(), url.getQuery(), url.getPort());
- }
- /**
- * Shuffle specific utils - build string for encoding from URL
- * @param request
- * @return string for encoding
- */
- public static String buildMsgFrom(HttpServletRequest request ) {
- return buildMsgFrom(request.getRequestURI(), request.getQueryString(),
- request.getLocalPort());
- }
- /**
- * Shuffle specific utils - build string for encoding from URL
- * @param uri_path
- * @param uri_query
- * @return string for encoding
- */
- private static String buildMsgFrom(String uri_path, String uri_query, int port) {
- return String.valueOf(port) + uri_path + "?" + uri_query;
- }
-
-
- /**
- * byte array to Hex String
- * @param ba
- * @return string with HEX value of the key
- */
- public static String toHex(byte[] ba) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PrintStream ps = new PrintStream(baos);
- for(byte b: ba) {
- ps.printf("%x", b);
- }
- return baos.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/security/TokenCache.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/security/TokenCache.java b/tez-engine/src/main/java/org/apache/tez/engine/common/security/TokenCache.java
deleted file mode 100644
index 5c42d22..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/security/TokenCache.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.security;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.tez.common.TezJobConfig;
-
-
-/**
- * This class provides user facing APIs for transferring secrets from
- * the job client to the tasks.
- * The secrets can be stored just before submission of jobs and read during
- * the task execution.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class TokenCache {
-
- private static final Log LOG = LogFactory.getLog(TokenCache.class);
-
-
- /**
- * auxiliary method to get user's secret keys..
- * @param alias
- * @return secret key from the storage
- */
- public static byte[] getSecretKey(Credentials credentials, Text alias) {
- if(credentials == null)
- return null;
- return credentials.getSecretKey(alias);
- }
-
- /**
- * Convenience method to obtain delegation tokens from namenodes
- * corresponding to the paths passed.
- * @param credentials
- * @param ps array of paths
- * @param conf configuration
- * @throws IOException
- */
- public static void obtainTokensForNamenodes(Credentials credentials,
- Path[] ps, Configuration conf) throws IOException {
- if (!UserGroupInformation.isSecurityEnabled()) {
- return;
- }
- obtainTokensForNamenodesInternal(credentials, ps, conf);
- }
-
- /**
- * Remove jobtoken referrals which don't make sense in the context
- * of the task execution.
- *
- * @param conf
- */
- public static void cleanUpTokenReferral(Configuration conf) {
- conf.unset(TezJobConfig.DAG_CREDENTIALS_BINARY);
- }
-
- static void obtainTokensForNamenodesInternal(Credentials credentials,
- Path[] ps, Configuration conf) throws IOException {
- Set<FileSystem> fsSet = new HashSet<FileSystem>();
- for(Path p: ps) {
- fsSet.add(p.getFileSystem(conf));
- }
- for (FileSystem fs : fsSet) {
- obtainTokensForNamenodesInternal(fs, credentials, conf);
- }
- }
-
- /**
- * get delegation token for a specific FS
- * @param fs
- * @param credentials
- * @param p
- * @param conf
- * @throws IOException
- */
- static void obtainTokensForNamenodesInternal(FileSystem fs,
- Credentials credentials, Configuration conf) throws IOException {
- String delegTokenRenewer = Master.getMasterPrincipal(conf);
- if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
- throw new IOException(
- "Can't get Master Kerberos principal for use as renewer");
- }
- mergeBinaryTokens(credentials, conf);
-
- final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer,
- credentials);
- if (tokens != null) {
- for (Token<?> token : tokens) {
- LOG.info("Got dt for " + fs.getUri() + "; "+token);
- }
- }
- }
-
- private static void mergeBinaryTokens(Credentials creds, Configuration conf) {
- String binaryTokenFilename =
- conf.get(TezJobConfig.DAG_CREDENTIALS_BINARY);
- if (binaryTokenFilename != null) {
- Credentials binary;
- try {
- binary = Credentials.readTokenStorageFile(
- new Path("file:///" + binaryTokenFilename), conf);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- // supplement existing tokens with the tokens in the binary file
- creds.mergeAll(binary);
- }
- }
-
- /**
- * file name used on HDFS for generated job token
- */
- @InterfaceAudience.Private
- public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
-
- /**
- * conf setting for job tokens cache file name
- */
- @InterfaceAudience.Private
- public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
- private static final Text JOB_TOKEN = new Text("JobToken");
- private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken");
-
- /**
- * load job token from a file
- * @param conf
- * @throws IOException
- */
- @InterfaceAudience.Private
- public static Credentials loadTokens(String jobTokenFile, Configuration conf)
- throws IOException {
- Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
-
- Credentials ts = Credentials.readTokenStorageFile(localJobTokenFile, conf);
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("Task: Loaded jobTokenFile from: "+
- localJobTokenFile.toUri().getPath()
- +"; num of sec keys = " + ts.numberOfSecretKeys() +
- " Number of tokens " + ts.numberOfTokens());
- }
- return ts;
- }
- /**
- * store job token
- * @param t
- */
- @InterfaceAudience.Private
- public static void setJobToken(Token<? extends TokenIdentifier> t,
- Credentials credentials) {
- credentials.addToken(JOB_TOKEN, t);
- }
- /**
- *
- * @return job token
- */
- @SuppressWarnings("unchecked")
- @InterfaceAudience.Private
- public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
- return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
- }
-
- @InterfaceAudience.Private
- public static void setShuffleSecretKey(byte[] key, Credentials credentials) {
- credentials.addSecretKey(SHUFFLE_TOKEN, key);
- }
-
- @InterfaceAudience.Private
- public static byte[] getShuffleSecretKey(Credentials credentials) {
- return getSecretKey(credentials, SHUFFLE_TOKEN);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ExceptionReporter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ExceptionReporter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ExceptionReporter.java
deleted file mode 100644
index 89a5888..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ExceptionReporter.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-/**
- * An interface for reporting exceptions to other threads
- */
-interface ExceptionReporter {
- void reportException(Throwable t);
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
deleted file mode 100644
index b48bb0b..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
+++ /dev/null
@@ -1,624 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLConnection;
-import java.security.GeneralSecurityException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import javax.crypto.SecretKey;
-import javax.net.ssl.HttpsURLConnection;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.security.SecureShuffleUtils;
-import org.apache.tez.engine.common.shuffle.impl.MapOutput.Type;
-import org.apache.tez.engine.common.sort.impl.IFileInputStream;
-
-import com.google.common.annotations.VisibleForTesting;
-
-class Fetcher extends Thread {
-
- private static final Log LOG = LogFactory.getLog(Fetcher.class);
-
- /** Basic/unit connection timeout (in milliseconds) */
- private final static int UNIT_CONNECT_TIMEOUT = 60 * 1000;
-
- private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
- CONNECTION, WRONG_REDUCE}
-
- private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
- private final TezCounter connectionErrs;
- private final TezCounter ioErrs;
- private final TezCounter wrongLengthErrs;
- private final TezCounter badIdErrs;
- private final TezCounter wrongMapErrs;
- private final TezCounter wrongReduceErrs;
- private final MergeManager merger;
- private final ShuffleScheduler scheduler;
- private final ShuffleClientMetrics metrics;
- private final Shuffle shuffle;
- private final int id;
- private static int nextId = 0;
-
- private final int connectionTimeout;
- private final int readTimeout;
-
- // Decompression of map-outputs
- private final CompressionCodec codec;
- private final Decompressor decompressor;
- private final SecretKey jobTokenSecret;
-
- private volatile boolean stopped = false;
-
- private Configuration job;
-
- private static boolean sslShuffle;
- private static SSLFactory sslFactory;
-
- public Fetcher(Configuration job,
- ShuffleScheduler scheduler, MergeManager merger,
- ShuffleClientMetrics metrics,
- Shuffle shuffle, SecretKey jobTokenSecret, TezInputContext inputContext) throws IOException {
- this.job = job;
- this.scheduler = scheduler;
- this.merger = merger;
- this.metrics = metrics;
- this.shuffle = shuffle;
- this.id = ++nextId;
- this.jobTokenSecret = jobTokenSecret;
- ioErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
- ShuffleErrors.IO_ERROR.toString());
- wrongLengthErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
- ShuffleErrors.WRONG_LENGTH.toString());
- badIdErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
- ShuffleErrors.BAD_ID.toString());
- wrongMapErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
- ShuffleErrors.WRONG_MAP.toString());
- connectionErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
- ShuffleErrors.CONNECTION.toString());
- wrongReduceErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
- ShuffleErrors.WRONG_REDUCE.toString());
-
- if (ConfigUtils.isIntermediateInputCompressed(job)) {
- Class<? extends CompressionCodec> codecClass =
- ConfigUtils.getIntermediateInputCompressorClass(job, DefaultCodec.class);
- codec = ReflectionUtils.newInstance(codecClass, job);
- decompressor = CodecPool.getDecompressor(codec);
- } else {
- codec = null;
- decompressor = null;
- }
-
- this.connectionTimeout =
- job.getInt(TezJobConfig.TEZ_ENGINE_SHUFFLE_CONNECT_TIMEOUT,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_STALLED_COPY_TIMEOUT);
- this.readTimeout =
- job.getInt(TezJobConfig.TEZ_ENGINE_SHUFFLE_READ_TIMEOUT,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_READ_TIMEOUT);
-
- setName("fetcher#" + id);
- setDaemon(true);
-
- synchronized (Fetcher.class) {
- sslShuffle = job.getBoolean(TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_SSL,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_ENABLE_SSL);
- if (sslShuffle && sslFactory == null) {
- sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, job);
- try {
- sslFactory.init();
- } catch (Exception ex) {
- sslFactory.destroy();
- throw new RuntimeException(ex);
- }
- }
- }
- }
-
- public void run() {
- try {
- while (!stopped && !Thread.currentThread().isInterrupted()) {
- MapHost host = null;
- try {
- // If merge is on, block
- merger.waitForInMemoryMerge();
-
- // Get a host to shuffle from
- host = scheduler.getHost();
- metrics.threadBusy();
-
- // Shuffle
- copyFromHost(host);
- } finally {
- if (host != null) {
- scheduler.freeHost(host);
- metrics.threadFree();
- }
- }
- }
- } catch (InterruptedException ie) {
- return;
- } catch (Throwable t) {
- shuffle.reportException(t);
- }
- }
-
- public void shutDown() throws InterruptedException {
- this.stopped = true;
- interrupt();
- try {
- join(5000);
- } catch (InterruptedException ie) {
- LOG.warn("Got interrupt while joining " + getName(), ie);
- }
- if (sslFactory != null) {
- sslFactory.destroy();
- }
- }
-
- @VisibleForTesting
- protected HttpURLConnection openConnection(URL url) throws IOException {
- HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- if (sslShuffle) {
- HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
- try {
- httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());
- } catch (GeneralSecurityException ex) {
- throw new IOException(ex);
- }
- httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
- }
- return conn;
- }
-
- /**
- * The crux of the matter...
- *
- * @param host {@link MapHost} from which we need to
- * shuffle available map-outputs.
- */
- @VisibleForTesting
- protected void copyFromHost(MapHost host) throws IOException {
- // Get completed maps on 'host'
- List<InputAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
-
- // Sanity check to catch hosts with only 'OBSOLETE' maps,
- // especially at the tail of large jobs
- if (srcAttempts.size() == 0) {
- return;
- }
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
- + srcAttempts);
- }
-
- // List of maps to be fetched yet
- Set<InputAttemptIdentifier> remaining = new HashSet<InputAttemptIdentifier>(srcAttempts);
-
- // Construct the url and connect
- DataInputStream input;
- boolean connectSucceeded = false;
-
- try {
- URL url = getMapOutputURL(host, srcAttempts);
- HttpURLConnection connection = openConnection(url);
-
- // generate hash of the url
- String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
- String encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecret);
-
- // put url hash into http header
- connection.addRequestProperty(
- SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
- // set the read timeout
- connection.setReadTimeout(readTimeout);
- // put shuffle version into http header
- connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
- ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
- connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
- ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
- connect(connection, connectionTimeout);
- connectSucceeded = true;
- input = new DataInputStream(connection.getInputStream());
-
- // Validate response code
- int rc = connection.getResponseCode();
- if (rc != HttpURLConnection.HTTP_OK) {
- throw new IOException(
- "Got invalid response code " + rc + " from " + url +
- ": " + connection.getResponseMessage());
- }
- // get the shuffle version
- if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
- connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
- || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
- connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) {
- throw new IOException("Incompatible shuffle response version");
- }
- // get the replyHash which is HMac of the encHash we sent to the server
- String replyHash = connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
- if(replyHash==null) {
- throw new IOException("security validation of TT Map output failed");
- }
- LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash);
- // verify that replyHash is HMac of encHash
- SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
- LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
- } catch (IOException ie) {
- ioErrs.increment(1);
- LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
- " map outputs", ie);
-
- // If connect did not succeed, just mark all the maps as failed,
- // indirectly penalizing the host
- if (!connectSucceeded) {
- for(InputAttemptIdentifier left: remaining) {
- scheduler.copyFailed(left, host, connectSucceeded);
- }
- } else {
- // If we got a read error at this stage, it implies there was a problem
- // with the first map, typically lost map. So, penalize only that map
- // and add the rest
- InputAttemptIdentifier firstMap = srcAttempts.get(0);
- scheduler.copyFailed(firstMap, host, connectSucceeded);
- }
-
- // Add back all the remaining maps, WITHOUT marking them as failed
- for(InputAttemptIdentifier left: remaining) {
- // TODO Should the first one be skipped ?
- scheduler.putBackKnownMapOutput(host, left);
- }
-
- return;
- }
-
- try {
- // Loop through available map-outputs and fetch them
- // On any error, faildTasks is not null and we exit
- // after putting back the remaining maps to the
- // yet_to_be_fetched list and marking the failed tasks.
- InputAttemptIdentifier[] failedTasks = null;
- while (!remaining.isEmpty() && failedTasks == null) {
- failedTasks = copyMapOutput(host, input, remaining);
- }
-
- if(failedTasks != null && failedTasks.length > 0) {
- LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
- for(InputAttemptIdentifier left: failedTasks) {
- scheduler.copyFailed(left, host, true);
- }
- }
-
- IOUtils.cleanup(LOG, input);
-
- // Sanity check
- if (failedTasks == null && !remaining.isEmpty()) {
- throw new IOException("server didn't return all expected map outputs: "
- + remaining.size() + " left.");
- }
- } finally {
- for (InputAttemptIdentifier left : remaining) {
- scheduler.putBackKnownMapOutput(host, left);
- }
- }
- }
-
- private static InputAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY = new InputAttemptIdentifier[0];
-
- private InputAttemptIdentifier[] copyMapOutput(MapHost host,
- DataInputStream input,
- Set<InputAttemptIdentifier> remaining) {
- MapOutput mapOutput = null;
- InputAttemptIdentifier srcAttemptId = null;
- long decompressedLength = -1;
- long compressedLength = -1;
-
- try {
- long startTime = System.currentTimeMillis();
- int forReduce = -1;
- //Read the shuffle header
- try {
- ShuffleHeader header = new ShuffleHeader();
- header.readFields(input);
- String pathComponent = header.mapId;
- srcAttemptId = scheduler.getIdentifierForPathComponent(pathComponent);
- compressedLength = header.compressedLength;
- decompressedLength = header.uncompressedLength;
- forReduce = header.forReduce;
- } catch (IllegalArgumentException e) {
- badIdErrs.increment(1);
- LOG.warn("Invalid map id ", e);
- //Don't know which one was bad, so consider all of them as bad
- return remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
- }
-
-
- // Do some basic sanity verification
- if (!verifySanity(compressedLength, decompressedLength, forReduce,
- remaining, srcAttemptId)) {
- return new InputAttemptIdentifier[] {srcAttemptId};
- }
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength +
- ", decomp len: " + decompressedLength);
- }
-
- // Get the location for the map output - either in-memory or on-disk
- mapOutput = merger.reserve(srcAttemptId, decompressedLength, id);
-
- // Check if we can shuffle *now* ...
- if (mapOutput.getType() == Type.WAIT) {
- LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
- //Not an error but wait to process data.
- return EMPTY_ATTEMPT_ID_ARRAY;
- }
-
- // Go!
- LOG.info("fetcher#" + id + " about to shuffle output of map " +
- mapOutput.getAttemptIdentifier() + " decomp: " +
- decompressedLength + " len: " + compressedLength + " to " +
- mapOutput.getType());
- if (mapOutput.getType() == Type.MEMORY) {
- shuffleToMemory(host, mapOutput, input,
- (int) decompressedLength, (int) compressedLength);
- } else {
- shuffleToDisk(host, mapOutput, input, compressedLength);
- }
-
- // Inform the shuffle scheduler
- long endTime = System.currentTimeMillis();
- scheduler.copySucceeded(srcAttemptId, host, compressedLength,
- endTime - startTime, mapOutput);
- // Note successful shuffle
- remaining.remove(srcAttemptId);
- metrics.successFetch();
- return null;
- } catch (IOException ioe) {
- ioErrs.increment(1);
- if (srcAttemptId == null || mapOutput == null) {
- LOG.info("fetcher#" + id + " failed to read map header" +
- srcAttemptId + " decomp: " +
- decompressedLength + ", " + compressedLength, ioe);
- if(srcAttemptId == null) {
- return remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
- } else {
- return new InputAttemptIdentifier[] {srcAttemptId};
- }
- }
-
- LOG.warn("Failed to shuffle output of " + srcAttemptId +
- " from " + host.getHostName(), ioe);
-
- // Inform the shuffle-scheduler
- mapOutput.abort();
- metrics.failedFetch();
- return new InputAttemptIdentifier[] {srcAttemptId};
- }
-
- }
-
- /**
- * Do some basic verification on the input received -- Being defensive
- * @param compressedLength
- * @param decompressedLength
- * @param forReduce
- * @param remaining
- * @param mapId
- * @return true/false, based on if the verification succeeded or not
- */
- private boolean verifySanity(long compressedLength, long decompressedLength,
- int forReduce, Set<InputAttemptIdentifier> remaining, InputAttemptIdentifier srcAttemptId) {
- if (compressedLength < 0 || decompressedLength < 0) {
- wrongLengthErrs.increment(1);
- LOG.warn(getName() + " invalid lengths in map output header: id: " +
- srcAttemptId + " len: " + compressedLength + ", decomp len: " +
- decompressedLength);
- return false;
- }
-
- int reduceStartId = shuffle.getReduceStartId();
- int reduceRange = shuffle.getReduceRange();
- if (forReduce < reduceStartId || forReduce >= reduceStartId+reduceRange) {
- wrongReduceErrs.increment(1);
- LOG.warn(getName() + " data for the wrong reduce map: " +
- srcAttemptId + " len: " + compressedLength + " decomp len: " +
- decompressedLength + " for reduce " + forReduce);
- return false;
- }
-
- // Sanity check
- if (!remaining.contains(srcAttemptId)) {
- wrongMapErrs.increment(1);
- LOG.warn("Invalid map-output! Received output for " + srcAttemptId);
- return false;
- }
-
- return true;
- }
-
- /**
- * Create the map-output-url. This will contain all the map ids
- * separated by commas
- * @param host
- * @param maps
- * @return
- * @throws MalformedURLException
- */
- private URL getMapOutputURL(MapHost host, List<InputAttemptIdentifier> srcAttempts
- ) throws MalformedURLException {
- // Get the base url
- StringBuffer url = new StringBuffer(host.getBaseUrl());
-
- boolean first = true;
- for (InputAttemptIdentifier mapId : srcAttempts) {
- if (!first) {
- url.append(",");
- }
- url.append(mapId.getPathComponent());
- first = false;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("MapOutput URL for " + host + " -> " + url.toString());
- }
- return new URL(url.toString());
- }
-
- /**
- * The connection establishment is attempted multiple times and is given up
- * only on the last failure. Instead of connecting with a timeout of
- * X, we try connecting with a timeout of x < X but multiple times.
- */
- private void connect(URLConnection connection, int connectionTimeout)
- throws IOException {
- int unit = 0;
- if (connectionTimeout < 0) {
- throw new IOException("Invalid timeout "
- + "[timeout = " + connectionTimeout + " ms]");
- } else if (connectionTimeout > 0) {
- unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
- }
- // set the connect timeout to the unit-connect-timeout
- connection.setConnectTimeout(unit);
- while (true) {
- try {
- connection.connect();
- break;
- } catch (IOException ioe) {
- // update the total remaining connect-timeout
- connectionTimeout -= unit;
-
- // throw an exception if we have waited for timeout amount of time
- // note that the updated value if timeout is used here
- if (connectionTimeout == 0) {
- throw ioe;
- }
-
- // reset the connect timeout for the last try
- if (connectionTimeout < unit) {
- unit = connectionTimeout;
- // reset the connect time out for the final connect
- connection.setConnectTimeout(unit);
- }
- }
- }
- }
-
- private void shuffleToMemory(MapHost host, MapOutput mapOutput,
- InputStream input,
- int decompressedLength,
- int compressedLength) throws IOException {
- IFileInputStream checksumIn =
- new IFileInputStream(input, compressedLength, job);
-
- input = checksumIn;
-
- // Are map-outputs compressed?
- if (codec != null) {
- decompressor.reset();
- input = codec.createInputStream(input, decompressor);
- }
-
- // Copy map-output into an in-memory buffer
- byte[] shuffleData = mapOutput.getMemory();
-
- try {
- IOUtils.readFully(input, shuffleData, 0, shuffleData.length);
- metrics.inputBytes(shuffleData.length);
- LOG.info("Read " + shuffleData.length + " bytes from map-output for " +
- mapOutput.getAttemptIdentifier());
- } catch (IOException ioe) {
- // Close the streams
- IOUtils.cleanup(LOG, input);
-
- // Re-throw
- throw ioe;
- }
-
- }
-
- private void shuffleToDisk(MapHost host, MapOutput mapOutput,
- InputStream input,
- long compressedLength)
- throws IOException {
- // Copy data to local-disk
- OutputStream output = mapOutput.getDisk();
- long bytesLeft = compressedLength;
- try {
- final int BYTES_TO_READ = 64 * 1024;
- byte[] buf = new byte[BYTES_TO_READ];
- while (bytesLeft > 0) {
- int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
- if (n < 0) {
- throw new IOException("read past end of stream reading " +
- mapOutput.getAttemptIdentifier());
- }
- output.write(buf, 0, n);
- bytesLeft -= n;
- metrics.inputBytes(n);
- }
-
- LOG.info("Read " + (compressedLength - bytesLeft) +
- " bytes from map-output for " +
- mapOutput.getAttemptIdentifier());
-
- output.close();
- } catch (IOException ioe) {
- // Close the streams
- IOUtils.cleanup(LOG, input, output);
-
- // Re-throw
- throw ioe;
- }
-
- // Sanity check
- if (bytesLeft != 0) {
- throw new IOException("Incomplete map output received for " +
- mapOutput.getAttemptIdentifier() + " from " +
- host.getHostName() + " (" +
- bytesLeft + " bytes missing of " +
- compressedLength + ")"
- );
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
deleted file mode 100644
index 1beed44..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.sort.impl.IFile;
-import org.apache.tez.engine.common.sort.impl.IFile.Reader;
-
-/**
- * <code>IFile.InMemoryReader</code> to read map-outputs present in-memory.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class InMemoryReader extends Reader {
- private final InputAttemptIdentifier taskAttemptId;
- private final MergeManager merger;
- DataInputBuffer memDataIn = new DataInputBuffer();
- private int start;
- private int length;
- private int prevKeyPos;
-
- public InMemoryReader(MergeManager merger, InputAttemptIdentifier taskAttemptId,
- byte[] data, int start, int length)
- throws IOException {
- super(null, null, length - start, null, null);
- this.merger = merger;
- this.taskAttemptId = taskAttemptId;
-
- buffer = data;
- bufferSize = (int)fileLength;
- memDataIn.reset(buffer, start, length);
- this.start = start;
- this.length = length;
- }
-
- @Override
- public void reset(int offset) {
- memDataIn.reset(buffer, start + offset, length);
- bytesRead = offset;
- eof = false;
- }
-
- @Override
- public long getPosition() throws IOException {
- // InMemoryReader does not initialize streams like Reader, so in.getPos()
- // would not work. Instead, return the number of uncompressed bytes read,
- // which will be correct since in-memory data is not compressed.
- return bytesRead;
- }
-
- @Override
- public long getLength() {
- return fileLength;
- }
-
- private void dumpOnError() {
- File dumpFile = new File("../output/" + taskAttemptId + ".dump");
- System.err.println("Dumping corrupt map-output of " + taskAttemptId +
- " to " + dumpFile.getAbsolutePath());
- try {
- FileOutputStream fos = new FileOutputStream(dumpFile);
- fos.write(buffer, 0, bufferSize);
- fos.close();
- } catch (IOException ioe) {
- System.err.println("Failed to dump map-output of " + taskAttemptId);
- }
- }
-
- public KeyState readRawKey(DataInputBuffer key) throws IOException {
- try {
- if (!positionToNextRecord(memDataIn)) {
- return KeyState.NO_KEY;
- }
- // Setup the key
- int pos = memDataIn.getPosition();
- byte[] data = memDataIn.getData();
- if(currentKeyLength == IFile.RLE_MARKER) {
- key.reset(data, prevKeyPos, prevKeyLength);
- currentKeyLength = prevKeyLength;
- return KeyState.SAME_KEY;
- }
- key.reset(data, pos, currentKeyLength);
- prevKeyPos = pos;
- // Position for the next value
- long skipped = memDataIn.skip(currentKeyLength);
- if (skipped != currentKeyLength) {
- throw new IOException("Rec# " + recNo +
- ": Failed to skip past key of length: " +
- currentKeyLength);
- }
-
- // Record the byte
- bytesRead += currentKeyLength;
- return KeyState.NEW_KEY;
- } catch (IOException ioe) {
- dumpOnError();
- throw ioe;
- }
- }
-
- public void nextRawValue(DataInputBuffer value) throws IOException {
- try {
- int pos = memDataIn.getPosition();
- byte[] data = memDataIn.getData();
- value.reset(data, pos, currentValueLength);
-
- // Position for the next record
- long skipped = memDataIn.skip(currentValueLength);
- if (skipped != currentValueLength) {
- throw new IOException("Rec# " + recNo +
- ": Failed to skip past value of length: " +
- currentValueLength);
- }
- // Record the byte
- bytesRead += currentValueLength;
-
- ++recNo;
- } catch (IOException ioe) {
- dumpOnError();
- throw ioe;
- }
- }
-
- public void close() {
- // Release
- dataIn = null;
- buffer = null;
- // Inform the MergeManager
- if (merger != null) {
- merger.unreserve(bufferSize);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryWriter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryWriter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryWriter.java
deleted file mode 100644
index b3ebb8b..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryWriter.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.tez.engine.common.sort.impl.IFile;
-import org.apache.tez.engine.common.sort.impl.IFileOutputStream;
-import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class InMemoryWriter extends Writer {
- private static final Log LOG = LogFactory.getLog(InMemoryWriter.class);
-
- private DataOutputStream out;
-
- public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) {
- super(null);
- this.out =
- new DataOutputStream(new IFileOutputStream(arrayStream));
- }
-
- public void append(Object key, Object value) throws IOException {
- throw new UnsupportedOperationException
- ("InMemoryWriter.append(K key, V value");
- }
-
- public void append(DataInputBuffer key, DataInputBuffer value)
- throws IOException {
- int keyLength = key.getLength() - key.getPosition();
- if (keyLength < 0) {
- throw new IOException("Negative key-length not allowed: " + keyLength +
- " for " + key);
- }
-
- boolean sameKey = (key == IFile.REPEAT_KEY);
-
- int valueLength = value.getLength() - value.getPosition();
- if (valueLength < 0) {
- throw new IOException("Negative value-length not allowed: " +
- valueLength + " for " + value);
- }
-
- if(sameKey) {
- WritableUtils.writeVInt(out, IFile.RLE_MARKER);
- WritableUtils.writeVInt(out, valueLength);
- out.write(value.getData(), value.getPosition(), valueLength);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("InMemWriter.append" +
- " key.data=" + key.getData() +
- " key.pos=" + key.getPosition() +
- " key.len=" +key.getLength() +
- " val.data=" + value.getData() +
- " val.pos=" + value.getPosition() +
- " val.len=" + value.getLength());
- }
- WritableUtils.writeVInt(out, keyLength);
- WritableUtils.writeVInt(out, valueLength);
- out.write(key.getData(), key.getPosition(), keyLength);
- out.write(value.getData(), value.getPosition(), valueLength);
- }
-
- }
-
- public void close() throws IOException {
- // Write EOF_MARKER for key/value length
- WritableUtils.writeVInt(out, IFile.EOF_MARKER);
- WritableUtils.writeVInt(out, IFile.EOF_MARKER);
-
- // Close the stream
- out.close();
- out = null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
deleted file mode 100644
index 20ec472..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-@Private
-class MapHost {
-
- public static enum State {
- IDLE, // No map outputs available
- BUSY, // Map outputs are being fetched
- PENDING, // Known map outputs which need to be fetched
- PENALIZED // Host penalized due to shuffle failures
- }
-
- private State state = State.IDLE;
- private final String hostName;
- private final int partitionId;
- private final String baseUrl;
- private final String identifier;
- // Tracks attempt IDs
- private List<InputAttemptIdentifier> maps = new ArrayList<InputAttemptIdentifier>();
-
- public MapHost(int partitionId, String hostName, String baseUrl) {
- this.partitionId = partitionId;
- this.hostName = hostName;
- this.baseUrl = baseUrl;
- this.identifier = createIdentifier(hostName, partitionId);
- }
-
- public static String createIdentifier(String hostName, int partitionId) {
- return hostName + ":" + Integer.toString(partitionId);
- }
-
- public String getIdentifier() {
- return identifier;
- }
-
- public int getPartitionId() {
- return partitionId;
- }
-
- public State getState() {
- return state;
- }
-
- public String getHostName() {
- return hostName;
- }
-
- public String getBaseUrl() {
- return baseUrl;
- }
-
- public synchronized void addKnownMap(InputAttemptIdentifier srcAttempt) {
- maps.add(srcAttempt);
- if (state == State.IDLE) {
- state = State.PENDING;
- }
- }
-
- public synchronized List<InputAttemptIdentifier> getAndClearKnownMaps() {
- List<InputAttemptIdentifier> currentKnownMaps = maps;
- maps = new ArrayList<InputAttemptIdentifier>();
- return currentKnownMaps;
- }
-
- public synchronized void markBusy() {
- state = State.BUSY;
- }
-
- public synchronized void markPenalized() {
- state = State.PENALIZED;
- }
-
- public synchronized int getNumKnownMapOutputs() {
- return maps.size();
- }
-
- /**
- * Called when the node is done with its penalty or done copying.
- * @return the host's new state
- */
- public synchronized State markAvailable() {
- if (maps.isEmpty()) {
- state = State.IDLE;
- } else {
- state = State.PENDING;
- }
- return state;
- }
-
- @Override
- public String toString() {
- return hostName;
- }
-
- /**
- * Mark the host as penalized
- */
- public synchronized void penalize() {
- state = State.PENALIZED;
- }
-}