You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/06/24 12:36:02 UTC
tajo git commit: Revert "TAJO-1408 Make IntermediateEntryProto more
compact. (Contributed by navis, Committed by hyunsik)"
Repository: tajo
Updated Branches:
refs/heads/master b24d18f5b -> 7c8477dd3
Revert "TAJO-1408 Make IntermediateEntryProto more compact. (Contributed by navis, Committed by hyunsik)"
This reverts commit fab63900cf44b61d571fb9c2982285bb8b669702.
Conflicts:
CHANGES
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7c8477dd
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7c8477dd
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7c8477dd
Branch: refs/heads/master
Commit: 7c8477dd3ac15188353044981746ecfef87f2f02
Parents: b24d18f
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Jun 24 03:05:45 2015 -0700
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Jun 24 03:05:45 2015 -0700
----------------------------------------------------------------------
CHANGES | 3 -
.../java/org/apache/tajo/util/NumberUtil.java | 54 ------------
.../apache/tajo/querymaster/Repartitioner.java | 18 ++--
.../java/org/apache/tajo/querymaster/Task.java | 56 +++++++------
.../tajo/worker/ExecutionBlockContext.java | 31 +++++--
.../src/main/proto/TajoWorkerProtocol.proto | 16 +++-
.../apache/tajo/master/TestRepartitioner.java | 77 ++++++++++-------
.../tajo/querymaster/TestIntermediateEntry.java | 24 +++---
.../tajo/storage/HashShuffleAppender.java | 87 ++++++--------------
.../storage/HashShuffleAppenderManager.java | 12 +--
10 files changed, 174 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/7c8477dd/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 170b928..1f69ac0 100644
--- a/CHANGES
+++ b/CHANGES
@@ -72,9 +72,6 @@ Release 0.11.0 - unreleased
eliminates an important kind of information.
(Contributed by Jongyoung Park, Committed by jihoon)
- TAJO-1408: Make IntermediateEntryProto more compact.
- (Contributed by navis, Committed by hyunsik)
-
TAJO-1584: Remove QueryMaster client sharing in TajoMaster and TajoWorker.
(jinho)
http://git-wip-us.apache.org/repos/asf/tajo/blob/7c8477dd/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
index d14e0b4..0d70cc2 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
@@ -18,7 +18,6 @@
package org.apache.tajo.util;
-import com.google.common.primitives.Longs;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.PlatformDependent;
@@ -26,8 +25,6 @@ import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Constructor;
import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.Iterator;
// this is an implementation copied from LazyPrimitives in hive
public class NumberUtil {
@@ -1053,55 +1050,4 @@ public class NumberUtil {
return returnNumber;
}
-
- public static long mergeToLong(int value1, int value2) {
- return (long)value1 << 32 | value2 & 0xffffffffl;
- }
-
- public static int toHighInt(long value) {
- return (int)(value >> 32);
- }
-
- public static int toLowInt(long value) {
- return (int)value;
- }
-
- public static class PrimitiveLongs implements Iterable<Long> {
- int index;
- long[] longArray;
-
- public PrimitiveLongs(int initLength) {
- longArray = new long[initLength];
- }
- public void add(long value) {
- reserve(1)[index++] = value;
- }
- public void add(long[] value) {
- System.arraycopy(value, 0, reserve(value.length), index, value.length);
- index += value.length;
- }
- public long[] backingArray() {
- return longArray;
- }
- public long[] toArray() {
- return Arrays.copyOfRange(longArray, 0, index);
- }
- public int size() {
- return index;
- }
- private long[] reserve(int reserve) {
- if (index + reserve < longArray.length) {
- return longArray;
- }
- int newLength = Math.max(index + reserve, longArray.length << 1);
- long[] newLongArray = new long[newLength];
- System.arraycopy(longArray, 0, newLongArray, 0, index);
- return longArray = newLongArray;
- }
-
- @Override
- public Iterator<Long> iterator() {
- return Longs.asList(toArray()).iterator();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7c8477dd/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index ec09145..2c3e9e2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -1044,17 +1044,15 @@ public class Repartitioner {
firstSplitVolume = splitVolume;
}
- //Each Pair object in the splits variable is assigned to the next ExecutionBlock's task.
+ //Each Pair object in the splits variable is assigned to the next ExectionBlock's task.
//The first long value is a offset of the intermediate file and the second long value is length.
- long[] splits = currentInterm.split(firstSplitVolume, splitVolume);
- if (splits == null || splits.length == 0) {
+ List<Pair<Long, Long>> splits = currentInterm.split(firstSplitVolume, splitVolume);
+ if (splits == null || splits.isEmpty()) {
break;
}
- for (int i = 0; i < splits.length; i += 2) {
- long offset = splits[i];
- long length = splits[i + 1];
- if (fetchListVolume > 0 && fetchListVolume + length >= splitVolume) {
+ for (Pair<Long, Long> eachSplit: splits) {
+ if (fetchListVolume > 0 && fetchListVolume + eachSplit.getSecond() >= splitVolume) {
if (!fetchListForSingleTask.isEmpty()) {
fetches.add(fetchListForSingleTask);
}
@@ -1063,10 +1061,10 @@ public class Repartitioner {
}
FetchImpl fetch = new FetchImpl(currentInterm.getPullHost(), SCATTERED_HASH_SHUFFLE,
ebId, currentInterm.getPartId(), TUtil.newList(currentInterm));
- fetch.setOffset(offset);
- fetch.setLength(length);
+ fetch.setOffset(eachSplit.getFirst());
+ fetch.setLength(eachSplit.getSecond());
fetchListForSingleTask.add(fetch);
- fetchListVolume += length;
+ fetchListVolume += eachSplit.getSecond();
}
}
if (!fetchListForSingleTask.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/7c8477dd/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
index d2be973..1da623e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
@@ -23,7 +23,6 @@ import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.primitives.Longs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -35,6 +34,7 @@ import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TaskId;
import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.ipc.TajoWorkerProtocol.FailureIntermediateProto;
import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
import org.apache.tajo.master.TaskState;
import org.apache.tajo.master.event.*;
@@ -44,7 +44,6 @@ import org.apache.tajo.storage.DataLocation;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.util.NumberUtil;
import org.apache.tajo.util.Pair;
import org.apache.tajo.util.TajoIdUtils;
import org.apache.tajo.util.history.TaskHistory;
@@ -786,8 +785,8 @@ public class Task implements EventHandler<TaskEvent> {
int partId;
PullHost host;
long volume;
- long[] pages;
- long[] failureRowNums;
+ List<Pair<Long, Integer>> pages;
+ List<Pair<Long, Pair<Integer, Integer>>> failureRowNums;
public IntermediateEntry(IntermediateEntryProto proto) {
this.ebId = new ExecutionBlockId(proto.getEbId());
@@ -795,12 +794,21 @@ public class Task implements EventHandler<TaskEvent> {
this.attemptId = proto.getAttemptId();
this.partId = proto.getPartId();
- String [] pullHost = proto.getAddress().split(":");
+ String[] pullHost = proto.getHost().split(":");
this.host = new PullHost(pullHost[0], Integer.parseInt(pullHost[1]));
this.volume = proto.getVolume();
- this.failureRowNums = Longs.toArray(proto.getFailuresList());
- this.pages = Longs.toArray(proto.getPagesList());
+ failureRowNums = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
+ for (FailureIntermediateProto eachFailure: proto.getFailuresList()) {
+
+ failureRowNums.add(new Pair(eachFailure.getPagePos(),
+ new Pair(eachFailure.getStartRowNum(), eachFailure.getEndRowNum())));
+ }
+
+ pages = new ArrayList<Pair<Long, Integer>>();
+ for (IntermediateEntryProto.PageProto eachPage: proto.getPagesList()) {
+ pages.add(new Pair(eachPage.getPos(), eachPage.getLength()));
+ }
}
public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host) {
@@ -850,15 +858,15 @@ public class Task implements EventHandler<TaskEvent> {
return this.volume = volume;
}
- public long[] getPages() {
+ public List<Pair<Long, Integer>> getPages() {
return pages;
}
- public void setPages(long[] pages) {
+ public void setPages(List<Pair<Long, Integer>> pages) {
this.pages = pages;
}
- public long[] getFailureRowNums() {
+ public List<Pair<Long, Pair<Integer, Integer>>> getFailureRowNums() {
return failureRowNums;
}
@@ -867,38 +875,38 @@ public class Task implements EventHandler<TaskEvent> {
return Objects.hashCode(ebId, taskId, partId, attemptId, host);
}
- public long[] split(long firstSplitVolume, long splitVolume) {
+ public List<Pair<Long, Long>> split(long firstSplitVolume, long splitVolume) {
+ List<Pair<Long, Long>> splits = new ArrayList<Pair<Long, Long>>();
- if (pages == null || pages.length == 0) {
- return null;
+ if (pages == null || pages.isEmpty()) {
+ return splits;
}
+ int pageSize = pages.size();
- NumberUtil.PrimitiveLongs splits = new NumberUtil.PrimitiveLongs(100);
long currentOffset = -1;
long currentBytes = 0;
long realSplitVolume = firstSplitVolume > 0 ? firstSplitVolume : splitVolume;
- for (int i = 0; i < pages.length; i += 2) {
+ for (int i = 0; i < pageSize; i++) {
+ Pair<Long, Integer> eachPage = pages.get(i);
if (currentOffset == -1) {
- currentOffset = pages[i];
+ currentOffset = eachPage.getFirst();
}
- if (currentBytes > 0 && currentBytes + pages[i + 1] >= realSplitVolume) {
- splits.add(currentOffset);
- splits.add(currentBytes);
- currentOffset = pages[i];
+ if (currentBytes > 0 && currentBytes + eachPage.getSecond() >= realSplitVolume) {
+ splits.add(new Pair(currentOffset, currentBytes));
+ currentOffset = eachPage.getFirst();
currentBytes = 0;
realSplitVolume = splitVolume;
}
- currentBytes += pages[i + 1];
+ currentBytes += eachPage.getSecond();
}
//add last
if (currentBytes > 0) {
- splits.add(currentOffset);
- splits.add(currentBytes);
+ splits.add(new Pair(currentOffset, currentBytes));
}
- return splits.toArray();
+ return splits;
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7c8477dd/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 9e4a60f..cbd451d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -41,6 +41,7 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.Pair;
import java.io.IOException;
import java.net.ConnectException;
@@ -339,18 +340,38 @@ public class ExecutionBlockContext {
}
IntermediateEntryProto.Builder intermediateBuilder = IntermediateEntryProto.newBuilder();
+ IntermediateEntryProto.PageProto.Builder pageBuilder = IntermediateEntryProto.PageProto.newBuilder();
+ FailureIntermediateProto.Builder failureBuilder = FailureIntermediateProto.newBuilder();
- WorkerConnectionInfo connectionInfo = getWorkerContext().getConnectionInfo();
- String address = connectionInfo.getHost() + ":" + connectionInfo.getPullServerPort();
for (HashShuffleAppenderManager.HashShuffleIntermediate eachShuffle: shuffles) {
+ List<IntermediateEntryProto.PageProto> pages = Lists.newArrayList();
+ List<FailureIntermediateProto> failureIntermediateItems = Lists.newArrayList();
+
+ for (Pair<Long, Integer> eachPage: eachShuffle.getPages()) {
+ pageBuilder.clear();
+ pageBuilder.setPos(eachPage.getFirst());
+ pageBuilder.setLength(eachPage.getSecond());
+ pages.add(pageBuilder.build());
+ }
+
+ for(Pair<Long, Pair<Integer, Integer>> eachFailure: eachShuffle.getFailureTskTupleIndexes()) {
+ failureBuilder.clear();
+ failureBuilder.setPagePos(eachFailure.getFirst());
+ failureBuilder.setStartRowNum(eachFailure.getSecond().getFirst());
+ failureBuilder.setEndRowNum(eachFailure.getSecond().getSecond());
+ failureIntermediateItems.add(failureBuilder.build());
+ }
+ intermediateBuilder.clear();
+
intermediateBuilder.setEbId(ebId.getProto())
- .setAddress(address)
+ .setHost(getWorkerContext().getConnectionInfo().getHost() + ":" +
+ getWorkerContext().getConnectionInfo().getPullServerPort())
.setTaskId(-1)
.setAttemptId(-1)
.setPartId(eachShuffle.getPartId())
.setVolume(eachShuffle.getVolume())
- .addAllPages(eachShuffle.getPages())
- .addAllFailures(eachShuffle.getFailureTskTupleIndexes());
+ .addAllPages(pages)
+ .addAllFailures(failureIntermediateItems);
intermediateEntries.add(intermediateBuilder.build());
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7c8477dd/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index 715b1e6..7cc4171 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -97,15 +97,25 @@ message FetchProto {
optional int64 length = 12;
}
+message FailureIntermediateProto {
+ required int64 pagePos = 1;
+ required int32 startRowNum = 2;
+ required int32 endRowNum = 3;
+}
+
message IntermediateEntryProto {
+ message PageProto {
+ required int64 pos = 1;
+ required int32 length = 2;
+ }
required ExecutionBlockIdProto ebId = 1;
required int32 taskId = 2;
required int32 attemptId = 3;
required int32 partId = 4;
- required string address = 5;
+ required string host = 5;
required int64 volume = 6;
- repeated int64 pages = 7; // pos : length
- repeated int64 failures = 8; // pagePos : startRowNum:endRowNum
+ repeated PageProto pages = 7;
+ repeated FailureIntermediateProto failures = 8;
}
message ExecutionBlockReport {
http://git-wip-us.apache.org/repos/asf/tajo/blob/7c8477dd/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index 17706f4..9910d79 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -28,7 +28,6 @@ import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.querymaster.Task;
import org.apache.tajo.querymaster.Task.IntermediateEntry;
import org.apache.tajo.querymaster.Repartitioner;
-import org.apache.tajo.util.NumberUtil;
import org.apache.tajo.util.Pair;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.FetchImpl;
@@ -177,7 +176,20 @@ public class TestRepartitioner {
List<IntermediateEntry> intermediateEntries = new ArrayList<IntermediateEntry>();
int[] pageLengths = {10 * 1024 * 1024, 10 * 1024 * 1024, 10 * 1024 * 1024, 5 * 1024 * 1024}; //35 MB
- long expectedTotalLength = makeIntermediates(pageLengths, true, intermediateEntries);
+ long expectedTotalLength = 0;
+ for (int i = 0; i < 20; i++) {
+ List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+ long offset = 0;
+ for (int j = 0; j < pageLengths.length; j++) {
+ pages.add(new Pair(offset, pageLengths[j]));
+ offset += pageLengths[j];
+ expectedTotalLength += pageLengths[j];
+ }
+ IntermediateEntry interm = new IntermediateEntry(i, -1, -1, new Task.PullHost("" + i, i));
+ interm.setPages(pages);
+ interm.setVolume(offset);
+ intermediateEntries.add(interm);
+ }
long splitVolume = 128 * 1024 * 1024;
List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
@@ -209,27 +221,6 @@ public class TestRepartitioner {
assertEquals(expectedTotalLength, totalLength);
}
- private long makeIntermediates(int[] pageLengths, boolean uniqueHosts,
- List<IntermediateEntry> intermediateEntries) {
- long expectedTotalLength = 0;
- for (int i = 0; i < 20; i++) {
- NumberUtil.PrimitiveLongs pages = new NumberUtil.PrimitiveLongs(10);
- long offset = 0;
- for (int pageLength : pageLengths) {
- pages.add(offset);
- pages.add(pageLength);
- offset += pageLength;
- expectedTotalLength += pageLength;
- }
- IntermediateEntry interm = new IntermediateEntry(i, -1, -1,
- new Task.PullHost(uniqueHosts ? "" + i : "", uniqueHosts ? i : 0));
- interm.setPages(pages.toArray());
- interm.setVolume(offset);
- intermediateEntries.add(interm);
- }
- return expectedTotalLength;
- }
-
@Test
public void testSplitIntermediates() {
List<IntermediateEntry> intermediateEntries = new ArrayList<IntermediateEntry>();
@@ -243,7 +234,20 @@ public class TestRepartitioner {
}
}
- long expectedTotalLength = makeIntermediates(pageLengths, true, intermediateEntries);
+ long expectedTotalLength = 0;
+ for (int i = 0; i < 20; i++) {
+ List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+ long offset = 0;
+ for (int j = 0; j < pageLengths.length; j++) {
+ pages.add(new Pair(offset, pageLengths[j]));
+ offset += pageLengths[j];
+ expectedTotalLength += pageLengths[j];
+ }
+ IntermediateEntry interm = new IntermediateEntry(i, -1, 0, new Task.PullHost("" + i, i));
+ interm.setPages(pages);
+ interm.setVolume(offset);
+ intermediateEntries.add(interm);
+ }
long splitVolume = 128 * 1024 * 1024;
List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
@@ -364,12 +368,12 @@ public class TestRepartitioner {
List<IntermediateEntry> entries = new ArrayList<IntermediateEntry>();
for (int i = 0; i < 2; i++) {
- NumberUtil.PrimitiveLongs pages = new NumberUtil.PrimitiveLongs(10);
- for (long[] pageData : pageDatas) {
- pages.add(pageData);
+ List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+ for (int j = 0; j < pageDatas.length; j++) {
+ pages.add(new Pair(pageDatas[j][0], (int) (pageDatas[j][1])));
}
IntermediateEntry entry = new IntermediateEntry(-1, -1, 1, new Task.PullHost("host" + i , 9000));
- entry.setPages(pages.toArray());
+ entry.setPages(pages);
entries.add(entry);
}
@@ -417,7 +421,22 @@ public class TestRepartitioner {
}
}
- long expectedTotalLength = makeIntermediates(pageLengths, false, intermediateEntries);
+ long expectedTotalLength = 0;
+ Task.PullHost pullHost = new Task.PullHost("host", 0);
+
+ for (int i = 0; i < 20; i++) {
+ List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+ long offset = 0;
+ for (int j = 0; j < pageLengths.length; j++) {
+ pages.add(new Pair(offset, pageLengths[j]));
+ offset += pageLengths[j];
+ expectedTotalLength += pageLengths[j];
+ }
+ IntermediateEntry interm = new IntermediateEntry(i, -1, 0, pullHost);
+ interm.setPages(pages);
+ interm.setVolume(offset);
+ intermediateEntries.add(interm);
+ }
long splitVolume = 128 * 1024 * 1024;
List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
http://git-wip-us.apache.org/repos/asf/tajo/blob/7c8477dd/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java
index b81085b..237fb32 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java
@@ -18,9 +18,12 @@
package org.apache.tajo.querymaster;
-import org.apache.tajo.util.NumberUtil;
+import org.apache.tajo.util.Pair;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.List;
+
import static org.junit.Assert.assertEquals;
public class TestIntermediateEntry {
@@ -28,22 +31,23 @@ public class TestIntermediateEntry {
public void testPage() {
Task.IntermediateEntry interm = new Task.IntermediateEntry(-1, -1, 1, null);
- NumberUtil.PrimitiveLongs pages = new NumberUtil.PrimitiveLongs(10);
- pages.add(new long[]{0L, 1441275});
- pages.add(new long[]{1441275L, 1447446});
- pages.add(new long[]{2888721L, 1442507});
+ List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+ pages.add(new Pair(0L, 1441275));
+ pages.add(new Pair(1441275L, 1447446));
+ pages.add(new Pair(2888721L, 1442507));
- interm.setPages(pages.toArray());
+ interm.setPages(pages);
long splitBytes = 3 * 1024 * 1024;
- long[] splits = interm.split(splitBytes, splitBytes);
- assertEquals(2 << 1, splits.length);
+ List<Pair<Long, Long>> splits = interm.split(splitBytes, splitBytes);
+ assertEquals(2, splits.size());
long[][] expected = { {0, 1441275 + 1447446}, {1441275 + 1447446, 1442507} };
for (int i = 0; i < 2; i++) {
- assertEquals(expected[i][0], splits[i << 1]);
- assertEquals(expected[i][1], splits[(i << 1) + 1]);
+ Pair<Long, Long> eachSplit = splits.get(i);
+ assertEquals(expected[i][0], eachSplit.getFirst().longValue());
+ assertEquals(expected[i][1], eachSplit.getSecond().longValue());
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7c8477dd/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
index ccf5dae..4c772c9 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
@@ -18,21 +18,16 @@
package org.apache.tajo.storage;
-import com.google.common.primitives.Longs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.util.NumberUtil;
-import org.apache.tajo.util.NumberUtil.PrimitiveLongs;
+import org.apache.tajo.util.Pair;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -47,12 +42,12 @@ public class HashShuffleAppender implements Appender {
private TableStats tableStats;
//<taskId,<page start offset,<task start, task end>>>
- private Map<TaskAttemptId, PrimitiveLongs> taskTupleIndexes;
+ private Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes;
//page start offset, length
- private PrimitiveLongs pages = new PrimitiveLongs(100);
+ private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
- private long[] currentPage;
+ private Pair<Long, Integer> currentPage;
private int pageSize; //MB
@@ -73,8 +68,8 @@ public class HashShuffleAppender implements Appender {
@Override
public void init() throws IOException {
- currentPage = new long[2];
- taskTupleIndexes = new HashMap<TaskAttemptId, PrimitiveLongs>();
+ currentPage = new Pair(0L, 0);
+ taskTupleIndexes = new HashMap<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>();
rowNumInPage = 0;
}
@@ -101,16 +96,16 @@ public class HashShuffleAppender implements Appender {
int writtenBytes = (int)(posAfterWritten - currentPos);
int nextRowNum = rowNumInPage + tuples.size();
- PrimitiveLongs taskIndexes = taskTupleIndexes.get(taskId);
+ List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = taskTupleIndexes.get(taskId);
if (taskIndexes == null) {
- taskIndexes = new PrimitiveLongs(100);
+ taskIndexes = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
taskTupleIndexes.put(taskId, taskIndexes);
}
- taskIndexes.add(currentPage[0]);
- taskIndexes.add(NumberUtil.mergeToLong(rowNumInPage, nextRowNum));
+ taskIndexes.add(
+ new Pair<Long, Pair<Integer, Integer>>(currentPage.getFirst(), new Pair(rowNumInPage, nextRowNum)));
rowNumInPage = nextRowNum;
- if (posAfterWritten - currentPage[0] > pageSize) {
+ if (posAfterWritten - currentPage.getFirst() > pageSize) {
nextPage(posAfterWritten);
rowNumInPage = 0;
}
@@ -129,9 +124,9 @@ public class HashShuffleAppender implements Appender {
}
private void nextPage(long pos) {
- currentPage[1] = pos - currentPage[0];
+ currentPage.setSecond((int) (pos - currentPage.getFirst()));
pages.add(currentPage);
- currentPage = new long[] {pos, 0};
+ currentPage = new Pair(pos, 0);
}
@Override
@@ -162,18 +157,16 @@ public class HashShuffleAppender implements Appender {
}
appender.flush();
offset = appender.getOffset();
- if (offset > currentPage[0]) {
+ if (offset > currentPage.getFirst()) {
nextPage(offset);
}
appender.close();
if (LOG.isDebugEnabled()) {
- int size = pages.size();
- if (size > 0) {
- long[] array = pages.backingArray();
- LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + size
- + ", lastPage=" + array[size - 2] + ", " + array[size - 1]);
+ if (!pages.isEmpty()) {
+ LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size()
+ + ", lastPage=" + pages.get(pages.size() - 1));
} else {
- LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + size);
+ LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size());
}
}
closed.set(true);
@@ -192,48 +185,22 @@ public class HashShuffleAppender implements Appender {
}
}
- public PrimitiveLongs getPages() {
+ public List<Pair<Long, Integer>> getPages() {
return pages;
}
- public Map<TaskAttemptId, PrimitiveLongs> getTaskTupleIndexes() {
+ public Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> getTaskTupleIndexes() {
return taskTupleIndexes;
}
- public Iterable<Long> getMergedTupleIndexes() {
- return getIterable(taskTupleIndexes.values());
- }
+ public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes() {
+ List<Pair<Long, Pair<Integer, Integer>>> merged = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
- public Iterable<Long> getIterable(final Collection<PrimitiveLongs> values) {
- return new Iterable<Long>() {
- @Override
- public Iterator<Long> iterator() {
- final Iterator<PrimitiveLongs> iterator1 = values.iterator();
- return new Iterator<Long>() {
- Iterator<Long> iterator2 = null;
- @Override
- public boolean hasNext() {
- while (iterator2 == null || !iterator2.hasNext()) {
- if (!iterator1.hasNext()) {
- return false;
- }
- iterator2 = iterator1.next().iterator();
- }
- return true;
- }
-
- @Override
- public Long next() {
- return iterator2.next();
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
- };
+ for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex: taskTupleIndexes.values()) {
+ merged.addAll(eachFailureIndex);
+ }
+
+ return merged;
}
public void taskFinished(TaskAttemptId taskId) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/7c8477dd/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
index 66c7f13..1b48fc0 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@ -174,14 +174,14 @@ public class HashShuffleAppenderManager {
private long volume;
//[<page start offset,<task start, task end>>]
- private Iterable<Long> failureTskTupleIndexes;
+ private Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes;
//[<page start offset, length>]
- private Iterable<Long> pages;
+ private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
public HashShuffleIntermediate(int partId, long volume,
- Iterable<Long> pages,
- Iterable<Long> failureTskTupleIndexes) {
+ List<Pair<Long, Integer>> pages,
+ Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes) {
this.partId = partId;
this.volume = volume;
this.failureTskTupleIndexes = failureTskTupleIndexes;
@@ -196,11 +196,11 @@ public class HashShuffleAppenderManager {
return volume;
}
- public Iterable<Long> getFailureTskTupleIndexes() {
+ public Collection<Pair<Long, Pair<Integer, Integer>>> getFailureTskTupleIndexes() {
return failureTskTupleIndexes;
}
- public Iterable<Long> getPages() {
+ public List<Pair<Long, Integer>> getPages() {
return pages;
}
}