You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2016/09/08 15:40:10 UTC
tez git commit: TEZ-3284. Synchronization for every write in
UnorderdKVWriter (jeagles)
Repository: tez
Updated Branches:
refs/heads/master 91a397b0b -> 495e6f0a4
TEZ-3284. Synchronization for every write in UnorderdKVWriter (jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/495e6f0a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/495e6f0a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/495e6f0a
Branch: refs/heads/master
Commit: 495e6f0a41c4e359c4e4f6786bea74f914d2c8b7
Parents: 91a397b
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Thu Sep 8 10:37:58 2016 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Thu Sep 8 10:38:13 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/common/TezUtilsInternal.java | 6 +-
.../common/io/NonSyncByteArrayInputStream.java | 99 ++++++++++++++++
.../common/io/NonSyncByteArrayOutputStream.java | 113 +++++++++++++++++++
.../tez/common/io/NonSyncDataOutputStream.java | 57 ++++++++++
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 8 +-
.../org/apache/tez/examples/JoinDataGen.java | 12 +-
.../tez/mapreduce/hadoop/MRInputHelpers.java | 4 +-
.../tez/history/parser/ATSFileParser.java | 4 +-
.../vertexmanager/ShuffleVertexManagerBase.java | 4 +-
.../common/shuffle/MemoryFetchedInput.java | 4 +-
.../shuffle/orderedgrouped/InMemoryReader.java | 4 +-
.../shuffle/orderedgrouped/InMemoryWriter.java | 4 +-
.../common/sort/impl/PipelinedSorter.java | 6 +-
.../common/sort/impl/dflt/DefaultSorter.java | 4 +-
.../writers/UnorderedPartitionedKVWriter.java | 6 +-
.../examples/MultipleCommitsExample.java | 12 +-
17 files changed, 309 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3f6281f..060d6d7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3284. Synchronization for every write in UnorderdKVWriter
TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain cases
TEZ-3230. Implement vertex manager and edge manager of cartesian product edge.
TEZ-3326. Display JVM system properties in AM and task logs.
http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 08a9aa8..b0b8906 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -17,7 +17,6 @@
package org.apache.tez.common;
-import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -41,6 +40,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.log4j.Appender;
+import org.apache.tez.common.io.NonSyncByteArrayOutputStream;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -139,7 +139,7 @@ public class TezUtilsInternal {
private static byte[] compressBytesInflateDeflate(byte[] inBytes) {
Deflater deflater = new Deflater(Deflater.BEST_SPEED);
deflater.setInput(inBytes);
- ByteArrayOutputStream bos = new ByteArrayOutputStream(inBytes.length);
+ NonSyncByteArrayOutputStream bos = new NonSyncByteArrayOutputStream(inBytes.length);
deflater.finish();
byte[] buffer = new byte[1024 * 8];
while (!deflater.finished()) {
@@ -153,7 +153,7 @@ public class TezUtilsInternal {
private static byte[] uncompressBytesInflateDeflate(byte[] inBytes) throws IOException {
Inflater inflater = new Inflater();
inflater.setInput(inBytes);
- ByteArrayOutputStream bos = new ByteArrayOutputStream(inBytes.length);
+ NonSyncByteArrayOutputStream bos = new NonSyncByteArrayOutputStream(inBytes.length);
byte[] buffer = new byte[1024 * 8];
while (!inflater.finished()) {
int count;
http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayInputStream.java b/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayInputStream.java
new file mode 100644
index 0000000..5b6e52a
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayInputStream.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common.io;
+
+import java.io.ByteArrayInputStream;
+
+/**
+ * A thread-not-safe version of ByteArrayInputStream, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncByteArrayInputStream extends ByteArrayInputStream {
+ public NonSyncByteArrayInputStream(byte[] bs) {
+ super(bs);
+ }
+
+ public NonSyncByteArrayInputStream(byte[] buf, int offset, int length) {
+ super(buf, offset, length);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int read() {
+ return (pos < count) ? (buf[pos++] & 0xff) : -1;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int read(byte b[], int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ } else if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ if (pos >= count) {
+ return -1;
+ }
+
+ int avail = count - pos;
+ if (len > avail) {
+ len = avail;
+ }
+ if (len <= 0) {
+ return 0;
+ }
+ System.arraycopy(buf, pos, b, off, len);
+ pos += len;
+ return len;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long skip(long n) {
+ long k = count - pos;
+ if (n < k) {
+ k = n < 0 ? 0 : n;
+ }
+
+ pos += k;
+ return k;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int available() {
+ return count - pos;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void reset() {
+ pos = mark;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayOutputStream.java b/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayOutputStream.java
new file mode 100644
index 0000000..40fae6f
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayOutputStream.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+/**
+ * A thread-not-safe version of ByteArrayOutputStream, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncByteArrayOutputStream extends ByteArrayOutputStream {
+ public NonSyncByteArrayOutputStream(int size) {
+ super(size);
+ }
+
+ public NonSyncByteArrayOutputStream() {
+ super();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void write(int b) {
+ enLargeBuffer(1);
+ buf[count] = (byte) b;
+ count += 1;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void write(byte b[], int off, int len) {
+ if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+ || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return;
+ }
+ enLargeBuffer(len);
+ System.arraycopy(b, off, buf, count, len);
+ count += len;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void reset() {
+ count = 0;
+ }
+
+ public void write(DataInput in, int length) throws IOException {
+ enLargeBuffer(length);
+ in.readFully(buf, count, length);
+ count += length;
+ }
+
+ private int enLargeBuffer(int increment) {
+ int temp = count + increment;
+ int newLen = temp;
+ if (temp > buf.length) {
+ if ((buf.length << 1) > temp) {
+ newLen = buf.length << 1;
+ }
+ byte newbuf[] = new byte[newLen];
+ System.arraycopy(buf, 0, newbuf, 0, count);
+ buf = newbuf;
+ }
+ return newLen;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ out.write(buf, 0, count);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public byte toByteArray()[] {
+ return Arrays.copyOf(buf, count);
+ }
+
+ /**
+ * {@inheritDoc}
+ */ public int size() {
+ return count;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-common/src/main/java/org/apache/tez/common/io/NonSyncDataOutputStream.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/io/NonSyncDataOutputStream.java b/tez-common/src/main/java/org/apache/tez/common/io/NonSyncDataOutputStream.java
new file mode 100644
index 0000000..d6302fe
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/io/NonSyncDataOutputStream.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common.io;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A thread-not-safe version of DataOutputStream, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncDataOutputStream extends DataOutputStream {
+ public NonSyncDataOutputStream(OutputStream stream) {
+ super(stream);
+ }
+
+ private void incrementWritten(int len) {
+ written += len;
+ if (written < 0) {
+ written = Integer.MAX_VALUE;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ incrementWritten(1);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void write(byte b[], int off, int len) throws IOException {
+ out.write(b, off, len);
+ incrementWritten(len);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 01bca8f..1dd7756 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -17,8 +17,6 @@
package org.apache.tez.dag.app.dag.impl;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
@@ -65,6 +63,8 @@ import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.LimitExceededException;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.common.io.NonSyncByteArrayInputStream;
+import org.apache.tez.common.io.NonSyncByteArrayOutputStream;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
@@ -2583,7 +2583,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
+ ", use NoOpVertexManager to replace it, vertexId=" + logIdentifier);
LOG.debug("VertexReconfigureDoneEvent=" + reconfigureDoneEvent);
}
- ByteArrayOutputStream out = new ByteArrayOutputStream();
+ NonSyncByteArrayOutputStream out = new NonSyncByteArrayOutputStream();
try {
reconfigureDoneEvent.toProtoStream(out);
} catch (IOException e) {
@@ -4458,7 +4458,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
LOG.debug("initialize NoOpVertexManager");
}
configurationDoneEvent = new VertexConfigurationDoneEvent();
- configurationDoneEvent.fromProtoStream(new ByteArrayInputStream(getContext().getUserPayload().deepCopyAsArray()));
+ configurationDoneEvent.fromProtoStream(new NonSyncByteArrayInputStream(getContext().getUserPayload().deepCopyAsArray()));
String vertexName = getContext().getVertexName();
if (getContext().getVertexNumTasks(vertexName) == -1) {
Preconditions.checkArgument(configurationDoneEvent.isSetParallelismCalled(), "SetParallelism must be called "
http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java
index 02728aa..4c0d201 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java
@@ -18,10 +18,7 @@
package org.apache.tez.examples;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
@@ -41,6 +38,9 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.common.io.NonSyncByteArrayInputStream;
+import org.apache.tez.common.io.NonSyncByteArrayOutputStream;
+import org.apache.tez.common.io.NonSyncDataOutputStream;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
import org.apache.tez.runtime.api.ProcessorContext;
@@ -171,8 +171,8 @@ public class JoinDataGen extends TezExampleBase {
public static byte[] createConfiguration(long streamOutputFileSize, long hashOutputFileSize)
throws IOException {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
+ NonSyncByteArrayOutputStream bos = new NonSyncByteArrayOutputStream();
+ NonSyncDataOutputStream dos = new NonSyncDataOutputStream(bos);
dos.writeLong(streamOutputFileSize);
dos.writeLong(hashOutputFileSize);
dos.close();
@@ -183,7 +183,7 @@ public class JoinDataGen extends TezExampleBase {
@Override
public void initialize() throws Exception {
byte[] payload = getContext().getUserPayload().deepCopyAsArray();
- ByteArrayInputStream bis = new ByteArrayInputStream(payload);
+ NonSyncByteArrayInputStream bis = new NonSyncByteArrayInputStream(payload);
DataInputStream dis = new DataInputStream(bis);
streamOutputFileSize = dis.readLong();
hashOutputFileSize = dis.readLong();
http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
index 6262e59..9b88c4d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
@@ -18,7 +18,6 @@
package org.apache.tez.mapreduce.hadoop;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -62,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.io.NonSyncDataOutputStream;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.TaskLocationHint;
@@ -247,7 +247,7 @@ public class MRInputHelpers {
ByteString.Output os = ByteString
.newOutput(SPLIT_SERIALIZED_LENGTH_ESTIMATE);
- oldSplit.write(new DataOutputStream(os));
+ oldSplit.write(new NonSyncDataOutputStream(os));
ByteString splitBs = os.toByteString();
builder.setSplitBytes(splitBs);
http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
index b4f3df3..fb42129 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.commons.io.IOUtils;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.common.io.NonSyncByteArrayOutputStream;
import org.apache.tez.history.parser.datamodel.BaseParser;
import org.apache.tez.history.parser.datamodel.Constants;
import org.apache.tez.history.parser.datamodel.DagInfo;
@@ -35,7 +36,6 @@ import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -176,7 +176,7 @@ public class ATSFileParser extends BaseParser implements ATSData {
private JSONObject readJson(InputStream in) throws IOException, JSONException {
//Read entire content to memory
- final ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ final NonSyncByteArrayOutputStream bout = new NonSyncByteArrayOutputStream();
IOUtils.copy(in, bout);
return new JSONObject(new String(bout.toByteArray(), "UTF-8"));
}
http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
index 951ce30..9b88cfd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
@@ -29,6 +29,7 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.io.NonSyncByteArrayInputStream;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -55,7 +56,6 @@ import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
-import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.BitSet;
@@ -337,7 +337,7 @@ abstract class ShuffleVertexManagerBase extends VertexManagerPlugin {
ByteString compressedPartitionStats = proto.getPartitionStats();
byte[] rawData = TezCommonUtils.decompressByteStringToByteArray(
compressedPartitionStats);
- ByteArrayInputStream bin = new ByteArrayInputStream(rawData);
+ NonSyncByteArrayInputStream bin = new NonSyncByteArrayInputStream(rawData);
partitionStats.deserialize(new DataInputStream(bin));
parsePartitionStats(partitionStats);
http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java
index e25a325..78f1f3b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java
@@ -18,11 +18,11 @@
package org.apache.tez.runtime.library.common.shuffle;
-import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.tez.common.io.NonSyncByteArrayInputStream;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import com.google.common.base.Preconditions;
@@ -45,7 +45,7 @@ public class MemoryFetchedInput extends FetchedInput {
@Override
public InputStream getInputStream() {
- return new ByteArrayInputStream(byteStream.getBuffer());
+ return new NonSyncByteArrayInputStream(byteStream.getBuffer());
}
public byte[] getBytes() {
http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
index 12fe057..41e8432 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
@@ -18,7 +18,6 @@
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
-import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.File;
import java.io.FileOutputStream;
@@ -27,6 +26,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.tez.common.io.NonSyncByteArrayInputStream;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
@@ -38,7 +38,7 @@ import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
@InterfaceStability.Unstable
public class InMemoryReader extends Reader {
- private static class ByteArrayDataInput extends ByteArrayInputStream implements DataInput {
+ private static class ByteArrayDataInput extends NonSyncByteArrayInputStream implements DataInput {
public ByteArrayDataInput(byte buf[], int offset, int length) {
super(buf, offset, length);
http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
index 17d57a6..d2778d8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
@@ -17,7 +17,6 @@
*/
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
-import java.io.DataOutputStream;
import java.io.IOException;
import org.slf4j.Logger;
@@ -26,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.tez.common.io.NonSyncDataOutputStream;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.IFileOutputStream;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
@@ -40,7 +40,7 @@ public class InMemoryWriter extends Writer {
public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) {
super(null, null);
this.out =
- new DataOutputStream(new IFileOutputStream(arrayStream));
+ new NonSyncDataOutputStream(new IFileOutputStream(arrayStream));
}
public void append(Object key, Object value) throws IOException {
http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 897d7d7..609e9ff 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -17,7 +17,6 @@
*/
package org.apache.tez.runtime.library.common.sort.impl;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
@@ -46,6 +45,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.common.io.NonSyncDataOutputStream;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.library.common.comparator.ProxyComparator;
import org.apache.hadoop.io.RawComparator;
@@ -863,7 +863,7 @@ public class PipelinedSorter extends ExternalSorter {
final byte[] rawkvmeta;
final int kvmetabase;
final ByteBuffer kvbuffer;
- final DataOutputStream out;
+ final NonSyncDataOutputStream out;
final RawComparator comparator;
final byte[] imeta = new byte[METASIZE];
@@ -895,7 +895,7 @@ public class PipelinedSorter extends ExternalSorter {
kvmeta = kvmetabuffer
.order(ByteOrder.nativeOrder())
.asIntBuffer();
- out = new DataOutputStream(
+ out = new NonSyncDataOutputStream(
new BufferStreamWrapper(kvbuffer));
this.comparator = comparator;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 69bfdb8..873d8e1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -18,7 +18,6 @@
package org.apache.tez.runtime.library.common.sort.impl.dflt;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@@ -43,6 +42,7 @@ import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.Progress;
import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.io.NonSyncDataOutputStream;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
@@ -474,7 +474,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
/**
* Inner class managing the spill of serialized records to disk.
*/
- protected class BlockingBuffer extends DataOutputStream {
+ protected class BlockingBuffer extends NonSyncDataOutputStream {
public BlockingBuffer() {
super(new Buffer());
http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 152096c..eff29a5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -17,7 +17,6 @@
*/
package org.apache.tez.runtime.library.common.writers;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@@ -52,6 +51,7 @@ import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.io.NonSyncDataOutputStream;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.OutputContext;
@@ -102,7 +102,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
@VisibleForTesting
final BlockingQueue<WrappedBuffer> availableBuffers;
private final ByteArrayOutputStream baos;
- private final DataOutputStream dos;
+ private final NonSyncDataOutputStream dos;
@VisibleForTesting
WrappedBuffer currentBuffer;
private final FileSystem rfs;
@@ -192,7 +192,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
}
currentBuffer = buffers[0];
baos = new ByteArrayOutputStream();
- dos = new DataOutputStream(baos);
+ dos = new NonSyncDataOutputStream(baos);
keySerializer.open(dos);
valSerializer.open(dos);
rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw();
http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MultipleCommitsExample.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MultipleCommitsExample.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MultipleCommitsExample.java
index fe7984b..5c93e87 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MultipleCommitsExample.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MultipleCommitsExample.java
@@ -17,12 +17,9 @@
*/
package org.apache.tez.mapreduce.examples;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -34,6 +31,9 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.client.TezClient;
+import org.apache.tez.common.io.NonSyncByteArrayInputStream;
+import org.apache.tez.common.io.NonSyncByteArrayOutputStream;
+import org.apache.tez.common.io.NonSyncDataOutputStream;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.GroupInputEdge;
@@ -172,8 +172,8 @@ public class MultipleCommitsExample extends TezExampleBase {
}
public UserPayload toUserPayload() throws IOException {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- this.write(new DataOutputStream(out));
+ NonSyncByteArrayOutputStream out = new NonSyncByteArrayOutputStream();
+ this.write(new NonSyncDataOutputStream(out));
return UserPayload.create(ByteBuffer.wrap(out.toByteArray()));
}
@@ -181,7 +181,7 @@ public class MultipleCommitsExample extends TezExampleBase {
throws IOException {
MultipleOutputProcessorConfig config = new MultipleOutputProcessorConfig();
config.readFields(new DataInputStream(
- new ByteArrayInputStream(payload.deepCopyAsArray())));
+ new NonSyncByteArrayInputStream(payload.deepCopyAsArray())));
return config;
}
}