You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/05/08 00:25:56 UTC
tajo git commit: TAJO-1408 Make IntermediateEntryProto more compact.
(Contributed by navis, Committed by hyunsik)
Repository: tajo
Updated Branches:
refs/heads/master 9b3824b5f -> fab63900c
TAJO-1408 Make IntermediateEntryProto more compact. (Contributed by navis, Committed by hyunsik)
close #428
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/fab63900
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/fab63900
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/fab63900
Branch: refs/heads/master
Commit: fab63900cf44b61d571fb9c2982285bb8b669702
Parents: 9b3824b
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu May 7 14:20:24 2015 -0700
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu May 7 14:20:24 2015 -0700
----------------------------------------------------------------------
CHANGES | 3 +
.../java/org/apache/tajo/util/NumberUtil.java | 54 ++++++++++++
.../apache/tajo/querymaster/Repartitioner.java | 18 ++--
.../java/org/apache/tajo/querymaster/Task.java | 56 ++++++-------
.../tajo/worker/ExecutionBlockContext.java | 32 ++-----
.../src/main/proto/TajoWorkerProtocol.proto | 16 +---
.../apache/tajo/master/TestRepartitioner.java | 77 +++++++----------
.../tajo/querymaster/TestIntermediateEntry.java | 24 +++---
.../tajo/storage/HashShuffleAppender.java | 87 ++++++++++++++------
.../storage/HashShuffleAppenderManager.java | 12 +--
10 files changed, 205 insertions(+), 174 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index a8074cd..d448c09 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,9 @@ Release 0.11.0 - unreleased
IMPROVEMENT
+ TAJO-1408: Make IntermediateEntryProto more compact.
+ (Contributed by navis, Committed by hyunsik)
+
TAJO-1584: Remove QueryMaster client sharing in TajoMaster and TajoWorker.
(jinho)
http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
index 0d70cc2..d14e0b4 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
@@ -18,6 +18,7 @@
package org.apache.tajo.util;
+import com.google.common.primitives.Longs;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.PlatformDependent;
@@ -25,6 +26,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Constructor;
import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Iterator;
// this is an implementation copied from LazyPrimitives in hive
public class NumberUtil {
@@ -1050,4 +1053,55 @@ public class NumberUtil {
return returnNumber;
}
+
+ public static long mergeToLong(int value1, int value2) {
+ return (long)value1 << 32 | value2 & 0xffffffffl;
+ }
+
+ public static int toHighInt(long value) {
+ return (int)(value >> 32);
+ }
+
+ public static int toLowInt(long value) {
+ return (int)value;
+ }
+
+ public static class PrimitiveLongs implements Iterable<Long> {
+ int index;
+ long[] longArray;
+
+ public PrimitiveLongs(int initLength) {
+ longArray = new long[initLength];
+ }
+ public void add(long value) {
+ reserve(1)[index++] = value;
+ }
+ public void add(long[] value) {
+ System.arraycopy(value, 0, reserve(value.length), index, value.length);
+ index += value.length;
+ }
+ public long[] backingArray() {
+ return longArray;
+ }
+ public long[] toArray() {
+ return Arrays.copyOfRange(longArray, 0, index);
+ }
+ public int size() {
+ return index;
+ }
+ private long[] reserve(int reserve) {
+ if (index + reserve < longArray.length) {
+ return longArray;
+ }
+ int newLength = Math.max(index + reserve, longArray.length << 1);
+ long[] newLongArray = new long[newLength];
+ System.arraycopy(longArray, 0, newLongArray, 0, index);
+ return longArray = newLongArray;
+ }
+
+ @Override
+ public Iterator<Long> iterator() {
+ return Longs.asList(toArray()).iterator();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index 8e9e343..545d615 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -1074,15 +1074,17 @@ public class Repartitioner {
firstSplitVolume = splitVolume;
}
- //Each Pair object in the splits variable is assigned to the next ExectionBlock's task.
+ //Each Pair object in the splits variable is assigned to the next ExecutionBlock's task.
//The first long value is a offset of the intermediate file and the second long value is length.
- List<Pair<Long, Long>> splits = currentInterm.split(firstSplitVolume, splitVolume);
- if (splits == null || splits.isEmpty()) {
+ long[] splits = currentInterm.split(firstSplitVolume, splitVolume);
+ if (splits == null || splits.length == 0) {
break;
}
- for (Pair<Long, Long> eachSplit: splits) {
- if (fetchListVolume > 0 && fetchListVolume + eachSplit.getSecond() >= splitVolume) {
+ for (int i = 0; i < splits.length; i += 2) {
+ long offset = splits[i];
+ long length = splits[i + 1];
+ if (fetchListVolume > 0 && fetchListVolume + length >= splitVolume) {
if (!fetchListForSingleTask.isEmpty()) {
fetches.add(fetchListForSingleTask);
}
@@ -1091,10 +1093,10 @@ public class Repartitioner {
}
FetchImpl fetch = new FetchImpl(currentInterm.getPullHost(), SCATTERED_HASH_SHUFFLE,
ebId, currentInterm.getPartId(), TUtil.newList(currentInterm));
- fetch.setOffset(eachSplit.getFirst());
- fetch.setLength(eachSplit.getSecond());
+ fetch.setOffset(offset);
+ fetch.setLength(length);
fetchListForSingleTask.add(fetch);
- fetchListVolume += eachSplit.getSecond();
+ fetchListVolume += length;
}
}
if (!fetchListForSingleTask.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
index 1da623e..d2be973 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
@@ -23,6 +23,7 @@ import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.google.common.primitives.Longs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -34,7 +35,6 @@ import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TaskId;
import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.ipc.TajoWorkerProtocol.FailureIntermediateProto;
import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
import org.apache.tajo.master.TaskState;
import org.apache.tajo.master.event.*;
@@ -44,6 +44,7 @@ import org.apache.tajo.storage.DataLocation;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.NumberUtil;
import org.apache.tajo.util.Pair;
import org.apache.tajo.util.TajoIdUtils;
import org.apache.tajo.util.history.TaskHistory;
@@ -785,8 +786,8 @@ public class Task implements EventHandler<TaskEvent> {
int partId;
PullHost host;
long volume;
- List<Pair<Long, Integer>> pages;
- List<Pair<Long, Pair<Integer, Integer>>> failureRowNums;
+ long[] pages;
+ long[] failureRowNums;
public IntermediateEntry(IntermediateEntryProto proto) {
this.ebId = new ExecutionBlockId(proto.getEbId());
@@ -794,21 +795,12 @@ public class Task implements EventHandler<TaskEvent> {
this.attemptId = proto.getAttemptId();
this.partId = proto.getPartId();
- String[] pullHost = proto.getHost().split(":");
+ String [] pullHost = proto.getAddress().split(":");
this.host = new PullHost(pullHost[0], Integer.parseInt(pullHost[1]));
this.volume = proto.getVolume();
- failureRowNums = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
- for (FailureIntermediateProto eachFailure: proto.getFailuresList()) {
-
- failureRowNums.add(new Pair(eachFailure.getPagePos(),
- new Pair(eachFailure.getStartRowNum(), eachFailure.getEndRowNum())));
- }
-
- pages = new ArrayList<Pair<Long, Integer>>();
- for (IntermediateEntryProto.PageProto eachPage: proto.getPagesList()) {
- pages.add(new Pair(eachPage.getPos(), eachPage.getLength()));
- }
+ this.failureRowNums = Longs.toArray(proto.getFailuresList());
+ this.pages = Longs.toArray(proto.getPagesList());
}
public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host) {
@@ -858,15 +850,15 @@ public class Task implements EventHandler<TaskEvent> {
return this.volume = volume;
}
- public List<Pair<Long, Integer>> getPages() {
+ public long[] getPages() {
return pages;
}
- public void setPages(List<Pair<Long, Integer>> pages) {
+ public void setPages(long[] pages) {
this.pages = pages;
}
- public List<Pair<Long, Pair<Integer, Integer>>> getFailureRowNums() {
+ public long[] getFailureRowNums() {
return failureRowNums;
}
@@ -875,38 +867,38 @@ public class Task implements EventHandler<TaskEvent> {
return Objects.hashCode(ebId, taskId, partId, attemptId, host);
}
- public List<Pair<Long, Long>> split(long firstSplitVolume, long splitVolume) {
- List<Pair<Long, Long>> splits = new ArrayList<Pair<Long, Long>>();
+ public long[] split(long firstSplitVolume, long splitVolume) {
- if (pages == null || pages.isEmpty()) {
- return splits;
+ if (pages == null || pages.length == 0) {
+ return null;
}
- int pageSize = pages.size();
+ NumberUtil.PrimitiveLongs splits = new NumberUtil.PrimitiveLongs(100);
long currentOffset = -1;
long currentBytes = 0;
long realSplitVolume = firstSplitVolume > 0 ? firstSplitVolume : splitVolume;
- for (int i = 0; i < pageSize; i++) {
- Pair<Long, Integer> eachPage = pages.get(i);
+ for (int i = 0; i < pages.length; i += 2) {
if (currentOffset == -1) {
- currentOffset = eachPage.getFirst();
+ currentOffset = pages[i];
}
- if (currentBytes > 0 && currentBytes + eachPage.getSecond() >= realSplitVolume) {
- splits.add(new Pair(currentOffset, currentBytes));
- currentOffset = eachPage.getFirst();
+ if (currentBytes > 0 && currentBytes + pages[i + 1] >= realSplitVolume) {
+ splits.add(currentOffset);
+ splits.add(currentBytes);
+ currentOffset = pages[i];
currentBytes = 0;
realSplitVolume = splitVolume;
}
- currentBytes += eachPage.getSecond();
+ currentBytes += pages[i + 1];
}
//add last
if (currentBytes > 0) {
- splits.add(new Pair(currentOffset, currentBytes));
+ splits.add(currentOffset);
+ splits.add(currentBytes);
}
- return splits;
+ return splits.toArray();
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index cd4b6a6..270000a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -41,12 +41,12 @@ import org.apache.tajo.rpc.RpcClientManager;
import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.Pair;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@@ -312,38 +312,18 @@ public class ExecutionBlockContext {
}
IntermediateEntryProto.Builder intermediateBuilder = IntermediateEntryProto.newBuilder();
- IntermediateEntryProto.PageProto.Builder pageBuilder = IntermediateEntryProto.PageProto.newBuilder();
- FailureIntermediateProto.Builder failureBuilder = FailureIntermediateProto.newBuilder();
+ WorkerConnectionInfo connectionInfo = getWorkerContext().getConnectionInfo();
+ String address = connectionInfo.getHost() + ":" + connectionInfo.getPullServerPort();
for (HashShuffleAppenderManager.HashShuffleIntermediate eachShuffle: shuffles) {
- List<IntermediateEntryProto.PageProto> pages = Lists.newArrayList();
- List<FailureIntermediateProto> failureIntermediateItems = Lists.newArrayList();
-
- for (Pair<Long, Integer> eachPage: eachShuffle.getPages()) {
- pageBuilder.clear();
- pageBuilder.setPos(eachPage.getFirst());
- pageBuilder.setLength(eachPage.getSecond());
- pages.add(pageBuilder.build());
- }
-
- for(Pair<Long, Pair<Integer, Integer>> eachFailure: eachShuffle.getFailureTskTupleIndexes()) {
- failureBuilder.clear();
- failureBuilder.setPagePos(eachFailure.getFirst());
- failureBuilder.setStartRowNum(eachFailure.getSecond().getFirst());
- failureBuilder.setEndRowNum(eachFailure.getSecond().getSecond());
- failureIntermediateItems.add(failureBuilder.build());
- }
- intermediateBuilder.clear();
-
intermediateBuilder.setEbId(ebId.getProto())
- .setHost(getWorkerContext().getConnectionInfo().getHost() + ":" +
- getWorkerContext().getConnectionInfo().getPullServerPort())
+ .setAddress(address)
.setTaskId(-1)
.setAttemptId(-1)
.setPartId(eachShuffle.getPartId())
.setVolume(eachShuffle.getVolume())
- .addAllPages(pages)
- .addAllFailures(failureIntermediateItems);
+ .addAllPages(eachShuffle.getPages())
+ .addAllFailures(eachShuffle.getFailureTskTupleIndexes());
intermediateEntries.add(intermediateBuilder.build());
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index fddef8f..5d8e446 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -97,25 +97,15 @@ message FetchProto {
optional int64 length = 12;
}
-message FailureIntermediateProto {
- required int64 pagePos = 1;
- required int32 startRowNum = 2;
- required int32 endRowNum = 3;
-}
-
message IntermediateEntryProto {
- message PageProto {
- required int64 pos = 1;
- required int32 length = 2;
- }
required ExecutionBlockIdProto ebId = 1;
required int32 taskId = 2;
required int32 attemptId = 3;
required int32 partId = 4;
- required string host = 5;
+ required string address = 5;
required int64 volume = 6;
- repeated PageProto pages = 7;
- repeated FailureIntermediateProto failures = 8;
+ repeated int64 pages = 7; // pos : length
+ repeated int64 failures = 8; // pagePos : startRowNum:endRowNum
}
message ExecutionBlockReport {
http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index 9910d79..17706f4 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -28,6 +28,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.querymaster.Task;
import org.apache.tajo.querymaster.Task.IntermediateEntry;
import org.apache.tajo.querymaster.Repartitioner;
+import org.apache.tajo.util.NumberUtil;
import org.apache.tajo.util.Pair;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.FetchImpl;
@@ -176,20 +177,7 @@ public class TestRepartitioner {
List<IntermediateEntry> intermediateEntries = new ArrayList<IntermediateEntry>();
int[] pageLengths = {10 * 1024 * 1024, 10 * 1024 * 1024, 10 * 1024 * 1024, 5 * 1024 * 1024}; //35 MB
- long expectedTotalLength = 0;
- for (int i = 0; i < 20; i++) {
- List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
- long offset = 0;
- for (int j = 0; j < pageLengths.length; j++) {
- pages.add(new Pair(offset, pageLengths[j]));
- offset += pageLengths[j];
- expectedTotalLength += pageLengths[j];
- }
- IntermediateEntry interm = new IntermediateEntry(i, -1, -1, new Task.PullHost("" + i, i));
- interm.setPages(pages);
- interm.setVolume(offset);
- intermediateEntries.add(interm);
- }
+ long expectedTotalLength = makeIntermediates(pageLengths, true, intermediateEntries);
long splitVolume = 128 * 1024 * 1024;
List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
@@ -221,6 +209,27 @@ public class TestRepartitioner {
assertEquals(expectedTotalLength, totalLength);
}
+ private long makeIntermediates(int[] pageLengths, boolean uniqueHosts,
+ List<IntermediateEntry> intermediateEntries) {
+ long expectedTotalLength = 0;
+ for (int i = 0; i < 20; i++) {
+ NumberUtil.PrimitiveLongs pages = new NumberUtil.PrimitiveLongs(10);
+ long offset = 0;
+ for (int pageLength : pageLengths) {
+ pages.add(offset);
+ pages.add(pageLength);
+ offset += pageLength;
+ expectedTotalLength += pageLength;
+ }
+ IntermediateEntry interm = new IntermediateEntry(i, -1, -1,
+ new Task.PullHost(uniqueHosts ? "" + i : "", uniqueHosts ? i : 0));
+ interm.setPages(pages.toArray());
+ interm.setVolume(offset);
+ intermediateEntries.add(interm);
+ }
+ return expectedTotalLength;
+ }
+
@Test
public void testSplitIntermediates() {
List<IntermediateEntry> intermediateEntries = new ArrayList<IntermediateEntry>();
@@ -234,20 +243,7 @@ public class TestRepartitioner {
}
}
- long expectedTotalLength = 0;
- for (int i = 0; i < 20; i++) {
- List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
- long offset = 0;
- for (int j = 0; j < pageLengths.length; j++) {
- pages.add(new Pair(offset, pageLengths[j]));
- offset += pageLengths[j];
- expectedTotalLength += pageLengths[j];
- }
- IntermediateEntry interm = new IntermediateEntry(i, -1, 0, new Task.PullHost("" + i, i));
- interm.setPages(pages);
- interm.setVolume(offset);
- intermediateEntries.add(interm);
- }
+ long expectedTotalLength = makeIntermediates(pageLengths, true, intermediateEntries);
long splitVolume = 128 * 1024 * 1024;
List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
@@ -368,12 +364,12 @@ public class TestRepartitioner {
List<IntermediateEntry> entries = new ArrayList<IntermediateEntry>();
for (int i = 0; i < 2; i++) {
- List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
- for (int j = 0; j < pageDatas.length; j++) {
- pages.add(new Pair(pageDatas[j][0], (int) (pageDatas[j][1])));
+ NumberUtil.PrimitiveLongs pages = new NumberUtil.PrimitiveLongs(10);
+ for (long[] pageData : pageDatas) {
+ pages.add(pageData);
}
IntermediateEntry entry = new IntermediateEntry(-1, -1, 1, new Task.PullHost("host" + i , 9000));
- entry.setPages(pages);
+ entry.setPages(pages.toArray());
entries.add(entry);
}
@@ -421,22 +417,7 @@ public class TestRepartitioner {
}
}
- long expectedTotalLength = 0;
- Task.PullHost pullHost = new Task.PullHost("host", 0);
-
- for (int i = 0; i < 20; i++) {
- List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
- long offset = 0;
- for (int j = 0; j < pageLengths.length; j++) {
- pages.add(new Pair(offset, pageLengths[j]));
- offset += pageLengths[j];
- expectedTotalLength += pageLengths[j];
- }
- IntermediateEntry interm = new IntermediateEntry(i, -1, 0, pullHost);
- interm.setPages(pages);
- interm.setVolume(offset);
- intermediateEntries.add(interm);
- }
+ long expectedTotalLength = makeIntermediates(pageLengths, false, intermediateEntries);
long splitVolume = 128 * 1024 * 1024;
List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java
index 237fb32..b81085b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java
@@ -18,12 +18,9 @@
package org.apache.tajo.querymaster;
-import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.NumberUtil;
import org.junit.Test;
-import java.util.ArrayList;
-import java.util.List;
-
import static org.junit.Assert.assertEquals;
public class TestIntermediateEntry {
@@ -31,23 +28,22 @@ public class TestIntermediateEntry {
public void testPage() {
Task.IntermediateEntry interm = new Task.IntermediateEntry(-1, -1, 1, null);
- List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
- pages.add(new Pair(0L, 1441275));
- pages.add(new Pair(1441275L, 1447446));
- pages.add(new Pair(2888721L, 1442507));
+ NumberUtil.PrimitiveLongs pages = new NumberUtil.PrimitiveLongs(10);
+ pages.add(new long[]{0L, 1441275});
+ pages.add(new long[]{1441275L, 1447446});
+ pages.add(new long[]{2888721L, 1442507});
- interm.setPages(pages);
+ interm.setPages(pages.toArray());
long splitBytes = 3 * 1024 * 1024;
- List<Pair<Long, Long>> splits = interm.split(splitBytes, splitBytes);
- assertEquals(2, splits.size());
+ long[] splits = interm.split(splitBytes, splitBytes);
+ assertEquals(2 << 1, splits.length);
long[][] expected = { {0, 1441275 + 1447446}, {1441275 + 1447446, 1442507} };
for (int i = 0; i < 2; i++) {
- Pair<Long, Long> eachSplit = splits.get(i);
- assertEquals(expected[i][0], eachSplit.getFirst().longValue());
- assertEquals(expected[i][1], eachSplit.getSecond().longValue());
+ assertEquals(expected[i][0], splits[i << 1]);
+ assertEquals(expected[i][1], splits[(i << 1) + 1]);
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
index 4c772c9..ccf5dae 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
@@ -18,16 +18,21 @@
package org.apache.tajo.storage;
+import com.google.common.primitives.Longs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.NumberUtil;
+import org.apache.tajo.util.NumberUtil.PrimitiveLongs;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -42,12 +47,12 @@ public class HashShuffleAppender implements Appender {
private TableStats tableStats;
//<taskId,<page start offset,<task start, task end>>>
- private Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes;
+ private Map<TaskAttemptId, PrimitiveLongs> taskTupleIndexes;
//page start offset, length
- private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+ private PrimitiveLongs pages = new PrimitiveLongs(100);
- private Pair<Long, Integer> currentPage;
+ private long[] currentPage;
private int pageSize; //MB
@@ -68,8 +73,8 @@ public class HashShuffleAppender implements Appender {
@Override
public void init() throws IOException {
- currentPage = new Pair(0L, 0);
- taskTupleIndexes = new HashMap<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>();
+ currentPage = new long[2];
+ taskTupleIndexes = new HashMap<TaskAttemptId, PrimitiveLongs>();
rowNumInPage = 0;
}
@@ -96,16 +101,16 @@ public class HashShuffleAppender implements Appender {
int writtenBytes = (int)(posAfterWritten - currentPos);
int nextRowNum = rowNumInPage + tuples.size();
- List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = taskTupleIndexes.get(taskId);
+ PrimitiveLongs taskIndexes = taskTupleIndexes.get(taskId);
if (taskIndexes == null) {
- taskIndexes = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
+ taskIndexes = new PrimitiveLongs(100);
taskTupleIndexes.put(taskId, taskIndexes);
}
- taskIndexes.add(
- new Pair<Long, Pair<Integer, Integer>>(currentPage.getFirst(), new Pair(rowNumInPage, nextRowNum)));
+ taskIndexes.add(currentPage[0]);
+ taskIndexes.add(NumberUtil.mergeToLong(rowNumInPage, nextRowNum));
rowNumInPage = nextRowNum;
- if (posAfterWritten - currentPage.getFirst() > pageSize) {
+ if (posAfterWritten - currentPage[0] > pageSize) {
nextPage(posAfterWritten);
rowNumInPage = 0;
}
@@ -124,9 +129,9 @@ public class HashShuffleAppender implements Appender {
}
private void nextPage(long pos) {
- currentPage.setSecond((int) (pos - currentPage.getFirst()));
+ currentPage[1] = pos - currentPage[0];
pages.add(currentPage);
- currentPage = new Pair(pos, 0);
+ currentPage = new long[] {pos, 0};
}
@Override
@@ -157,16 +162,18 @@ public class HashShuffleAppender implements Appender {
}
appender.flush();
offset = appender.getOffset();
- if (offset > currentPage.getFirst()) {
+ if (offset > currentPage[0]) {
nextPage(offset);
}
appender.close();
if (LOG.isDebugEnabled()) {
- if (!pages.isEmpty()) {
- LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size()
- + ", lastPage=" + pages.get(pages.size() - 1));
+ int size = pages.size();
+ if (size > 0) {
+ long[] array = pages.backingArray();
+ LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + size
+ + ", lastPage=" + array[size - 2] + ", " + array[size - 1]);
} else {
- LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size());
+ LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + size);
}
}
closed.set(true);
@@ -185,22 +192,48 @@ public class HashShuffleAppender implements Appender {
}
}
- public List<Pair<Long, Integer>> getPages() {
+ public PrimitiveLongs getPages() {
return pages;
}
- public Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> getTaskTupleIndexes() {
+ public Map<TaskAttemptId, PrimitiveLongs> getTaskTupleIndexes() {
return taskTupleIndexes;
}
- public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes() {
- List<Pair<Long, Pair<Integer, Integer>>> merged = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
-
- for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex: taskTupleIndexes.values()) {
- merged.addAll(eachFailureIndex);
- }
+ public Iterable<Long> getMergedTupleIndexes() {
+ return getIterable(taskTupleIndexes.values());
+ }
- return merged;
+ public Iterable<Long> getIterable(final Collection<PrimitiveLongs> values) {
+ return new Iterable<Long>() {
+ @Override
+ public Iterator<Long> iterator() {
+ final Iterator<PrimitiveLongs> iterator1 = values.iterator();
+ return new Iterator<Long>() {
+ Iterator<Long> iterator2 = null;
+ @Override
+ public boolean hasNext() {
+ while (iterator2 == null || !iterator2.hasNext()) {
+ if (!iterator1.hasNext()) {
+ return false;
+ }
+ iterator2 = iterator1.next().iterator();
+ }
+ return true;
+ }
+
+ @Override
+ public Long next() {
+ return iterator2.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
}
public void taskFinished(TaskAttemptId taskId) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
index d2e9b4d..4635b76 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@ -176,14 +176,14 @@ public class HashShuffleAppenderManager {
private long volume;
//[<page start offset,<task start, task end>>]
- private Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes;
+ private Iterable<Long> failureTskTupleIndexes;
//[<page start offset, length>]
- private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+ private Iterable<Long> pages;
public HashShuffleIntermediate(int partId, long volume,
- List<Pair<Long, Integer>> pages,
- Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes) {
+ Iterable<Long> pages,
+ Iterable<Long> failureTskTupleIndexes) {
this.partId = partId;
this.volume = volume;
this.failureTskTupleIndexes = failureTskTupleIndexes;
@@ -198,11 +198,11 @@ public class HashShuffleAppenderManager {
return volume;
}
- public Collection<Pair<Long, Pair<Integer, Integer>>> getFailureTskTupleIndexes() {
+ public Iterable<Long> getFailureTskTupleIndexes() {
return failureTskTupleIndexes;
}
- public List<Pair<Long, Integer>> getPages() {
+ public Iterable<Long> getPages() {
return pages;
}
}