You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/05/08 07:53:52 UTC
[1/4] tajo git commit: TAJO-1408 Make IntermediateEntryProto more
compact. (Contributed by navis, Committed by hyunsik)
Repository: tajo
Updated Branches:
refs/heads/index_support 42bcf2de0 -> 410ca3188
TAJO-1408 Make IntermediateEntryProto more compact. (Contributed by navis, Committed by hyunsik)
close #428
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/fab63900
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/fab63900
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/fab63900
Branch: refs/heads/index_support
Commit: fab63900cf44b61d571fb9c2982285bb8b669702
Parents: 9b3824b
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu May 7 14:20:24 2015 -0700
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu May 7 14:20:24 2015 -0700
----------------------------------------------------------------------
CHANGES | 3 +
.../java/org/apache/tajo/util/NumberUtil.java | 54 ++++++++++++
.../apache/tajo/querymaster/Repartitioner.java | 18 ++--
.../java/org/apache/tajo/querymaster/Task.java | 56 ++++++-------
.../tajo/worker/ExecutionBlockContext.java | 32 ++-----
.../src/main/proto/TajoWorkerProtocol.proto | 16 +---
.../apache/tajo/master/TestRepartitioner.java | 77 +++++++----------
.../tajo/querymaster/TestIntermediateEntry.java | 24 +++---
.../tajo/storage/HashShuffleAppender.java | 87 ++++++++++++++------
.../storage/HashShuffleAppenderManager.java | 12 +--
10 files changed, 205 insertions(+), 174 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index a8074cd..d448c09 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,9 @@ Release 0.11.0 - unreleased
IMPROVEMENT
+ TAJO-1408: Make IntermediateEntryProto more compact.
+ (Contributed by navis, Committed by hyunsik)
+
TAJO-1584: Remove QueryMaster client sharing in TajoMaster and TajoWorker.
(jinho)
http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
index 0d70cc2..d14e0b4 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
@@ -18,6 +18,7 @@
package org.apache.tajo.util;
+import com.google.common.primitives.Longs;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.PlatformDependent;
@@ -25,6 +26,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Constructor;
import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Iterator;
// this is an implementation copied from LazyPrimitives in hive
public class NumberUtil {
@@ -1050,4 +1053,55 @@ public class NumberUtil {
return returnNumber;
}
+
+ public static long mergeToLong(int value1, int value2) {
+ return (long)value1 << 32 | value2 & 0xffffffffl;
+ }
+
+ public static int toHighInt(long value) {
+ return (int)(value >> 32);
+ }
+
+ public static int toLowInt(long value) {
+ return (int)value;
+ }
+
+ public static class PrimitiveLongs implements Iterable<Long> {
+ int index;
+ long[] longArray;
+
+ public PrimitiveLongs(int initLength) {
+ longArray = new long[initLength];
+ }
+ public void add(long value) {
+ reserve(1)[index++] = value;
+ }
+ public void add(long[] value) {
+ System.arraycopy(value, 0, reserve(value.length), index, value.length);
+ index += value.length;
+ }
+ public long[] backingArray() {
+ return longArray;
+ }
+ public long[] toArray() {
+ return Arrays.copyOfRange(longArray, 0, index);
+ }
+ public int size() {
+ return index;
+ }
+ private long[] reserve(int reserve) {
+ if (index + reserve < longArray.length) {
+ return longArray;
+ }
+ int newLength = Math.max(index + reserve, longArray.length << 1);
+ long[] newLongArray = new long[newLength];
+ System.arraycopy(longArray, 0, newLongArray, 0, index);
+ return longArray = newLongArray;
+ }
+
+ @Override
+ public Iterator<Long> iterator() {
+ return Longs.asList(toArray()).iterator();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index 8e9e343..545d615 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -1074,15 +1074,17 @@ public class Repartitioner {
firstSplitVolume = splitVolume;
}
- //Each Pair object in the splits variable is assigned to the next ExectionBlock's task.
+ //Each Pair object in the splits variable is assigned to the next ExecutionBlock's task.
//The first long value is a offset of the intermediate file and the second long value is length.
- List<Pair<Long, Long>> splits = currentInterm.split(firstSplitVolume, splitVolume);
- if (splits == null || splits.isEmpty()) {
+ long[] splits = currentInterm.split(firstSplitVolume, splitVolume);
+ if (splits == null || splits.length == 0) {
break;
}
- for (Pair<Long, Long> eachSplit: splits) {
- if (fetchListVolume > 0 && fetchListVolume + eachSplit.getSecond() >= splitVolume) {
+ for (int i = 0; i < splits.length; i += 2) {
+ long offset = splits[i];
+ long length = splits[i + 1];
+ if (fetchListVolume > 0 && fetchListVolume + length >= splitVolume) {
if (!fetchListForSingleTask.isEmpty()) {
fetches.add(fetchListForSingleTask);
}
@@ -1091,10 +1093,10 @@ public class Repartitioner {
}
FetchImpl fetch = new FetchImpl(currentInterm.getPullHost(), SCATTERED_HASH_SHUFFLE,
ebId, currentInterm.getPartId(), TUtil.newList(currentInterm));
- fetch.setOffset(eachSplit.getFirst());
- fetch.setLength(eachSplit.getSecond());
+ fetch.setOffset(offset);
+ fetch.setLength(length);
fetchListForSingleTask.add(fetch);
- fetchListVolume += eachSplit.getSecond();
+ fetchListVolume += length;
}
}
if (!fetchListForSingleTask.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
index 1da623e..d2be973 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
@@ -23,6 +23,7 @@ import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.google.common.primitives.Longs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -34,7 +35,6 @@ import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TaskId;
import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.ipc.TajoWorkerProtocol.FailureIntermediateProto;
import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
import org.apache.tajo.master.TaskState;
import org.apache.tajo.master.event.*;
@@ -44,6 +44,7 @@ import org.apache.tajo.storage.DataLocation;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.NumberUtil;
import org.apache.tajo.util.Pair;
import org.apache.tajo.util.TajoIdUtils;
import org.apache.tajo.util.history.TaskHistory;
@@ -785,8 +786,8 @@ public class Task implements EventHandler<TaskEvent> {
int partId;
PullHost host;
long volume;
- List<Pair<Long, Integer>> pages;
- List<Pair<Long, Pair<Integer, Integer>>> failureRowNums;
+ long[] pages;
+ long[] failureRowNums;
public IntermediateEntry(IntermediateEntryProto proto) {
this.ebId = new ExecutionBlockId(proto.getEbId());
@@ -794,21 +795,12 @@ public class Task implements EventHandler<TaskEvent> {
this.attemptId = proto.getAttemptId();
this.partId = proto.getPartId();
- String[] pullHost = proto.getHost().split(":");
+ String [] pullHost = proto.getAddress().split(":");
this.host = new PullHost(pullHost[0], Integer.parseInt(pullHost[1]));
this.volume = proto.getVolume();
- failureRowNums = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
- for (FailureIntermediateProto eachFailure: proto.getFailuresList()) {
-
- failureRowNums.add(new Pair(eachFailure.getPagePos(),
- new Pair(eachFailure.getStartRowNum(), eachFailure.getEndRowNum())));
- }
-
- pages = new ArrayList<Pair<Long, Integer>>();
- for (IntermediateEntryProto.PageProto eachPage: proto.getPagesList()) {
- pages.add(new Pair(eachPage.getPos(), eachPage.getLength()));
- }
+ this.failureRowNums = Longs.toArray(proto.getFailuresList());
+ this.pages = Longs.toArray(proto.getPagesList());
}
public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host) {
@@ -858,15 +850,15 @@ public class Task implements EventHandler<TaskEvent> {
return this.volume = volume;
}
- public List<Pair<Long, Integer>> getPages() {
+ public long[] getPages() {
return pages;
}
- public void setPages(List<Pair<Long, Integer>> pages) {
+ public void setPages(long[] pages) {
this.pages = pages;
}
- public List<Pair<Long, Pair<Integer, Integer>>> getFailureRowNums() {
+ public long[] getFailureRowNums() {
return failureRowNums;
}
@@ -875,38 +867,38 @@ public class Task implements EventHandler<TaskEvent> {
return Objects.hashCode(ebId, taskId, partId, attemptId, host);
}
- public List<Pair<Long, Long>> split(long firstSplitVolume, long splitVolume) {
- List<Pair<Long, Long>> splits = new ArrayList<Pair<Long, Long>>();
+ public long[] split(long firstSplitVolume, long splitVolume) {
- if (pages == null || pages.isEmpty()) {
- return splits;
+ if (pages == null || pages.length == 0) {
+ return null;
}
- int pageSize = pages.size();
+ NumberUtil.PrimitiveLongs splits = new NumberUtil.PrimitiveLongs(100);
long currentOffset = -1;
long currentBytes = 0;
long realSplitVolume = firstSplitVolume > 0 ? firstSplitVolume : splitVolume;
- for (int i = 0; i < pageSize; i++) {
- Pair<Long, Integer> eachPage = pages.get(i);
+ for (int i = 0; i < pages.length; i += 2) {
if (currentOffset == -1) {
- currentOffset = eachPage.getFirst();
+ currentOffset = pages[i];
}
- if (currentBytes > 0 && currentBytes + eachPage.getSecond() >= realSplitVolume) {
- splits.add(new Pair(currentOffset, currentBytes));
- currentOffset = eachPage.getFirst();
+ if (currentBytes > 0 && currentBytes + pages[i + 1] >= realSplitVolume) {
+ splits.add(currentOffset);
+ splits.add(currentBytes);
+ currentOffset = pages[i];
currentBytes = 0;
realSplitVolume = splitVolume;
}
- currentBytes += eachPage.getSecond();
+ currentBytes += pages[i + 1];
}
//add last
if (currentBytes > 0) {
- splits.add(new Pair(currentOffset, currentBytes));
+ splits.add(currentOffset);
+ splits.add(currentBytes);
}
- return splits;
+ return splits.toArray();
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index cd4b6a6..270000a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -41,12 +41,12 @@ import org.apache.tajo.rpc.RpcClientManager;
import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.Pair;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@@ -312,38 +312,18 @@ public class ExecutionBlockContext {
}
IntermediateEntryProto.Builder intermediateBuilder = IntermediateEntryProto.newBuilder();
- IntermediateEntryProto.PageProto.Builder pageBuilder = IntermediateEntryProto.PageProto.newBuilder();
- FailureIntermediateProto.Builder failureBuilder = FailureIntermediateProto.newBuilder();
+ WorkerConnectionInfo connectionInfo = getWorkerContext().getConnectionInfo();
+ String address = connectionInfo.getHost() + ":" + connectionInfo.getPullServerPort();
for (HashShuffleAppenderManager.HashShuffleIntermediate eachShuffle: shuffles) {
- List<IntermediateEntryProto.PageProto> pages = Lists.newArrayList();
- List<FailureIntermediateProto> failureIntermediateItems = Lists.newArrayList();
-
- for (Pair<Long, Integer> eachPage: eachShuffle.getPages()) {
- pageBuilder.clear();
- pageBuilder.setPos(eachPage.getFirst());
- pageBuilder.setLength(eachPage.getSecond());
- pages.add(pageBuilder.build());
- }
-
- for(Pair<Long, Pair<Integer, Integer>> eachFailure: eachShuffle.getFailureTskTupleIndexes()) {
- failureBuilder.clear();
- failureBuilder.setPagePos(eachFailure.getFirst());
- failureBuilder.setStartRowNum(eachFailure.getSecond().getFirst());
- failureBuilder.setEndRowNum(eachFailure.getSecond().getSecond());
- failureIntermediateItems.add(failureBuilder.build());
- }
- intermediateBuilder.clear();
-
intermediateBuilder.setEbId(ebId.getProto())
- .setHost(getWorkerContext().getConnectionInfo().getHost() + ":" +
- getWorkerContext().getConnectionInfo().getPullServerPort())
+ .setAddress(address)
.setTaskId(-1)
.setAttemptId(-1)
.setPartId(eachShuffle.getPartId())
.setVolume(eachShuffle.getVolume())
- .addAllPages(pages)
- .addAllFailures(failureIntermediateItems);
+ .addAllPages(eachShuffle.getPages())
+ .addAllFailures(eachShuffle.getFailureTskTupleIndexes());
intermediateEntries.add(intermediateBuilder.build());
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index fddef8f..5d8e446 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -97,25 +97,15 @@ message FetchProto {
optional int64 length = 12;
}
-message FailureIntermediateProto {
- required int64 pagePos = 1;
- required int32 startRowNum = 2;
- required int32 endRowNum = 3;
-}
-
message IntermediateEntryProto {
- message PageProto {
- required int64 pos = 1;
- required int32 length = 2;
- }
required ExecutionBlockIdProto ebId = 1;
required int32 taskId = 2;
required int32 attemptId = 3;
required int32 partId = 4;
- required string host = 5;
+ required string address = 5;
required int64 volume = 6;
- repeated PageProto pages = 7;
- repeated FailureIntermediateProto failures = 8;
+ repeated int64 pages = 7; // pos : length
+ repeated int64 failures = 8; // pagePos : startRowNum:endRowNum
}
message ExecutionBlockReport {
http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index 9910d79..17706f4 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -28,6 +28,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.querymaster.Task;
import org.apache.tajo.querymaster.Task.IntermediateEntry;
import org.apache.tajo.querymaster.Repartitioner;
+import org.apache.tajo.util.NumberUtil;
import org.apache.tajo.util.Pair;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.FetchImpl;
@@ -176,20 +177,7 @@ public class TestRepartitioner {
List<IntermediateEntry> intermediateEntries = new ArrayList<IntermediateEntry>();
int[] pageLengths = {10 * 1024 * 1024, 10 * 1024 * 1024, 10 * 1024 * 1024, 5 * 1024 * 1024}; //35 MB
- long expectedTotalLength = 0;
- for (int i = 0; i < 20; i++) {
- List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
- long offset = 0;
- for (int j = 0; j < pageLengths.length; j++) {
- pages.add(new Pair(offset, pageLengths[j]));
- offset += pageLengths[j];
- expectedTotalLength += pageLengths[j];
- }
- IntermediateEntry interm = new IntermediateEntry(i, -1, -1, new Task.PullHost("" + i, i));
- interm.setPages(pages);
- interm.setVolume(offset);
- intermediateEntries.add(interm);
- }
+ long expectedTotalLength = makeIntermediates(pageLengths, true, intermediateEntries);
long splitVolume = 128 * 1024 * 1024;
List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
@@ -221,6 +209,27 @@ public class TestRepartitioner {
assertEquals(expectedTotalLength, totalLength);
}
+ private long makeIntermediates(int[] pageLengths, boolean uniqueHosts,
+ List<IntermediateEntry> intermediateEntries) {
+ long expectedTotalLength = 0;
+ for (int i = 0; i < 20; i++) {
+ NumberUtil.PrimitiveLongs pages = new NumberUtil.PrimitiveLongs(10);
+ long offset = 0;
+ for (int pageLength : pageLengths) {
+ pages.add(offset);
+ pages.add(pageLength);
+ offset += pageLength;
+ expectedTotalLength += pageLength;
+ }
+ IntermediateEntry interm = new IntermediateEntry(i, -1, -1,
+ new Task.PullHost(uniqueHosts ? "" + i : "", uniqueHosts ? i : 0));
+ interm.setPages(pages.toArray());
+ interm.setVolume(offset);
+ intermediateEntries.add(interm);
+ }
+ return expectedTotalLength;
+ }
+
@Test
public void testSplitIntermediates() {
List<IntermediateEntry> intermediateEntries = new ArrayList<IntermediateEntry>();
@@ -234,20 +243,7 @@ public class TestRepartitioner {
}
}
- long expectedTotalLength = 0;
- for (int i = 0; i < 20; i++) {
- List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
- long offset = 0;
- for (int j = 0; j < pageLengths.length; j++) {
- pages.add(new Pair(offset, pageLengths[j]));
- offset += pageLengths[j];
- expectedTotalLength += pageLengths[j];
- }
- IntermediateEntry interm = new IntermediateEntry(i, -1, 0, new Task.PullHost("" + i, i));
- interm.setPages(pages);
- interm.setVolume(offset);
- intermediateEntries.add(interm);
- }
+ long expectedTotalLength = makeIntermediates(pageLengths, true, intermediateEntries);
long splitVolume = 128 * 1024 * 1024;
List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
@@ -368,12 +364,12 @@ public class TestRepartitioner {
List<IntermediateEntry> entries = new ArrayList<IntermediateEntry>();
for (int i = 0; i < 2; i++) {
- List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
- for (int j = 0; j < pageDatas.length; j++) {
- pages.add(new Pair(pageDatas[j][0], (int) (pageDatas[j][1])));
+ NumberUtil.PrimitiveLongs pages = new NumberUtil.PrimitiveLongs(10);
+ for (long[] pageData : pageDatas) {
+ pages.add(pageData);
}
IntermediateEntry entry = new IntermediateEntry(-1, -1, 1, new Task.PullHost("host" + i , 9000));
- entry.setPages(pages);
+ entry.setPages(pages.toArray());
entries.add(entry);
}
@@ -421,22 +417,7 @@ public class TestRepartitioner {
}
}
- long expectedTotalLength = 0;
- Task.PullHost pullHost = new Task.PullHost("host", 0);
-
- for (int i = 0; i < 20; i++) {
- List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
- long offset = 0;
- for (int j = 0; j < pageLengths.length; j++) {
- pages.add(new Pair(offset, pageLengths[j]));
- offset += pageLengths[j];
- expectedTotalLength += pageLengths[j];
- }
- IntermediateEntry interm = new IntermediateEntry(i, -1, 0, pullHost);
- interm.setPages(pages);
- interm.setVolume(offset);
- intermediateEntries.add(interm);
- }
+ long expectedTotalLength = makeIntermediates(pageLengths, false, intermediateEntries);
long splitVolume = 128 * 1024 * 1024;
List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java
index 237fb32..b81085b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestIntermediateEntry.java
@@ -18,12 +18,9 @@
package org.apache.tajo.querymaster;
-import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.NumberUtil;
import org.junit.Test;
-import java.util.ArrayList;
-import java.util.List;
-
import static org.junit.Assert.assertEquals;
public class TestIntermediateEntry {
@@ -31,23 +28,22 @@ public class TestIntermediateEntry {
public void testPage() {
Task.IntermediateEntry interm = new Task.IntermediateEntry(-1, -1, 1, null);
- List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
- pages.add(new Pair(0L, 1441275));
- pages.add(new Pair(1441275L, 1447446));
- pages.add(new Pair(2888721L, 1442507));
+ NumberUtil.PrimitiveLongs pages = new NumberUtil.PrimitiveLongs(10);
+ pages.add(new long[]{0L, 1441275});
+ pages.add(new long[]{1441275L, 1447446});
+ pages.add(new long[]{2888721L, 1442507});
- interm.setPages(pages);
+ interm.setPages(pages.toArray());
long splitBytes = 3 * 1024 * 1024;
- List<Pair<Long, Long>> splits = interm.split(splitBytes, splitBytes);
- assertEquals(2, splits.size());
+ long[] splits = interm.split(splitBytes, splitBytes);
+ assertEquals(2 << 1, splits.length);
long[][] expected = { {0, 1441275 + 1447446}, {1441275 + 1447446, 1442507} };
for (int i = 0; i < 2; i++) {
- Pair<Long, Long> eachSplit = splits.get(i);
- assertEquals(expected[i][0], eachSplit.getFirst().longValue());
- assertEquals(expected[i][1], eachSplit.getSecond().longValue());
+ assertEquals(expected[i][0], splits[i << 1]);
+ assertEquals(expected[i][1], splits[(i << 1) + 1]);
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
index 4c772c9..ccf5dae 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
@@ -18,16 +18,21 @@
package org.apache.tajo.storage;
+import com.google.common.primitives.Longs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.NumberUtil;
+import org.apache.tajo.util.NumberUtil.PrimitiveLongs;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -42,12 +47,12 @@ public class HashShuffleAppender implements Appender {
private TableStats tableStats;
//<taskId,<page start offset,<task start, task end>>>
- private Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes;
+ private Map<TaskAttemptId, PrimitiveLongs> taskTupleIndexes;
//page start offset, length
- private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+ private PrimitiveLongs pages = new PrimitiveLongs(100);
- private Pair<Long, Integer> currentPage;
+ private long[] currentPage;
private int pageSize; //MB
@@ -68,8 +73,8 @@ public class HashShuffleAppender implements Appender {
@Override
public void init() throws IOException {
- currentPage = new Pair(0L, 0);
- taskTupleIndexes = new HashMap<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>();
+ currentPage = new long[2];
+ taskTupleIndexes = new HashMap<TaskAttemptId, PrimitiveLongs>();
rowNumInPage = 0;
}
@@ -96,16 +101,16 @@ public class HashShuffleAppender implements Appender {
int writtenBytes = (int)(posAfterWritten - currentPos);
int nextRowNum = rowNumInPage + tuples.size();
- List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = taskTupleIndexes.get(taskId);
+ PrimitiveLongs taskIndexes = taskTupleIndexes.get(taskId);
if (taskIndexes == null) {
- taskIndexes = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
+ taskIndexes = new PrimitiveLongs(100);
taskTupleIndexes.put(taskId, taskIndexes);
}
- taskIndexes.add(
- new Pair<Long, Pair<Integer, Integer>>(currentPage.getFirst(), new Pair(rowNumInPage, nextRowNum)));
+ taskIndexes.add(currentPage[0]);
+ taskIndexes.add(NumberUtil.mergeToLong(rowNumInPage, nextRowNum));
rowNumInPage = nextRowNum;
- if (posAfterWritten - currentPage.getFirst() > pageSize) {
+ if (posAfterWritten - currentPage[0] > pageSize) {
nextPage(posAfterWritten);
rowNumInPage = 0;
}
@@ -124,9 +129,9 @@ public class HashShuffleAppender implements Appender {
}
private void nextPage(long pos) {
- currentPage.setSecond((int) (pos - currentPage.getFirst()));
+ currentPage[1] = pos - currentPage[0];
pages.add(currentPage);
- currentPage = new Pair(pos, 0);
+ currentPage = new long[] {pos, 0};
}
@Override
@@ -157,16 +162,18 @@ public class HashShuffleAppender implements Appender {
}
appender.flush();
offset = appender.getOffset();
- if (offset > currentPage.getFirst()) {
+ if (offset > currentPage[0]) {
nextPage(offset);
}
appender.close();
if (LOG.isDebugEnabled()) {
- if (!pages.isEmpty()) {
- LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size()
- + ", lastPage=" + pages.get(pages.size() - 1));
+ int size = pages.size();
+ if (size > 0) {
+ long[] array = pages.backingArray();
+ LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + size
+ + ", lastPage=" + array[size - 2] + ", " + array[size - 1]);
} else {
- LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size());
+ LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + size);
}
}
closed.set(true);
@@ -185,22 +192,48 @@ public class HashShuffleAppender implements Appender {
}
}
- public List<Pair<Long, Integer>> getPages() {
+ public PrimitiveLongs getPages() {
return pages;
}
- public Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> getTaskTupleIndexes() {
+ public Map<TaskAttemptId, PrimitiveLongs> getTaskTupleIndexes() {
return taskTupleIndexes;
}
- public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes() {
- List<Pair<Long, Pair<Integer, Integer>>> merged = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
-
- for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex: taskTupleIndexes.values()) {
- merged.addAll(eachFailureIndex);
- }
+ public Iterable<Long> getMergedTupleIndexes() {
+ return getIterable(taskTupleIndexes.values());
+ }
- return merged;
+ public Iterable<Long> getIterable(final Collection<PrimitiveLongs> values) {
+ return new Iterable<Long>() {
+ @Override
+ public Iterator<Long> iterator() {
+ final Iterator<PrimitiveLongs> iterator1 = values.iterator();
+ return new Iterator<Long>() {
+ Iterator<Long> iterator2 = null;
+ @Override
+ public boolean hasNext() {
+ while (iterator2 == null || !iterator2.hasNext()) {
+ if (!iterator1.hasNext()) {
+ return false;
+ }
+ iterator2 = iterator1.next().iterator();
+ }
+ return true;
+ }
+
+ @Override
+ public Long next() {
+ return iterator2.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
}
public void taskFinished(TaskAttemptId taskId) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/fab63900/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
index d2e9b4d..4635b76 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@ -176,14 +176,14 @@ public class HashShuffleAppenderManager {
private long volume;
//[<page start offset,<task start, task end>>]
- private Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes;
+ private Iterable<Long> failureTskTupleIndexes;
//[<page start offset, length>]
- private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+ private Iterable<Long> pages;
public HashShuffleIntermediate(int partId, long volume,
- List<Pair<Long, Integer>> pages,
- Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes) {
+ Iterable<Long> pages,
+ Iterable<Long> failureTskTupleIndexes) {
this.partId = partId;
this.volume = volume;
this.failureTskTupleIndexes = failureTskTupleIndexes;
@@ -198,11 +198,11 @@ public class HashShuffleAppenderManager {
return volume;
}
- public Collection<Pair<Long, Pair<Integer, Integer>>> getFailureTskTupleIndexes() {
+ public Iterable<Long> getFailureTskTupleIndexes() {
return failureTskTupleIndexes;
}
- public List<Pair<Long, Integer>> getPages() {
+ public Iterable<Long> getPages() {
return pages;
}
}
[4/4] tajo git commit: TAJO-1594: Catalog schema is invalid for some
databases. (jihoon)
Posted by ji...@apache.org.
TAJO-1594: Catalog schema is invalid for some databases. (jihoon)
Closes #563
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/410ca318
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/410ca318
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/410ca318
Branch: refs/heads/index_support
Commit: 410ca31889467b48fe77f24d0fcba37ed1f70b43
Parents: 5096081
Author: Jihoon Son <ji...@apache.org>
Authored: Fri May 8 14:54:39 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Fri May 8 14:55:10 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 ++
.../src/main/resources/schemas/derby/derby.xml | 2 ++
.../src/main/resources/schemas/mariadb/mariadb.xml | 17 +++++++++--------
.../src/main/resources/schemas/mysql/mysql.xml | 17 +++++++++--------
.../src/main/resources/schemas/oracle/oracle.xml | 9 ++++++---
5 files changed, 28 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/410ca318/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index d448c09..67d656f 100644
--- a/CHANGES
+++ b/CHANGES
@@ -114,6 +114,8 @@ Release 0.11.0 - unreleased
BUG FIXES
+ TAJO-1594: Catalog schema is invalid for some databases. (jihoon)
+
TAJO-1556: "insert into select" with reordered column list does not work.
(Contributed by Yongjin Choi, Committed by jihoon)
http://git-wip-us.apache.org/repos/asf/tajo/blob/410ca318/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
index f3431e4..203e492 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
@@ -117,6 +117,8 @@
NULL_ORDERS VARCHAR(128) NOT NULL, -- array of null orderings
IS_UNIQUE BOOLEAN NOT NULL,
IS_CLUSTERED BOOLEAN NOT NULL,
+ FOREIGN KEY (DB_ID) REFERENCES DATABASES_ (DB_ID) ON DELETE CASCADE,
+ FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE,
CONSTRAINT INDEXES_PK PRIMARY KEY (INDEX_ID),
CONSTRAINT C_INDEXES_UNIQ UNIQUE (DB_ID, INDEX_NAME)
)]]>
http://git-wip-us.apache.org/repos/asf/tajo/blob/410ca318/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml
index 79ccd0a..cee060b 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml
@@ -94,20 +94,21 @@
<tns:Object order="6" type="table" name="INDEXES">
<tns:sql><![CDATA[
CREATE TABLE INDEXES (
+ INDEX_ID INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
DB_ID INT NOT NULL,
TID INT NOT NULL,
- INDEX_NAME VARCHAR(128) BINARY NOT NULL,
- COLUMN_NAME VARCHAR(128) BINARY NOT NULL,
- DATA_TYPE VARCHAR(128) NOT NULL,
- INDEX_TYPE CHAR(32) NOT NULL,
+ INDEX_NAME VARCHAR(128) NOT NULL,
+ INDEX_TYPE CHAR(32) BINARY NOT NULL,
+ PATH VARCHAR(4096) NOT NULL,
+ COLUMN_NAMES VARCHAR(256) NOT NULL, -- array of column names
+ DATA_TYPES VARCHAR(128) NOT NULL, -- array of column types
+ ORDERS VARCHAR(128) NOT NULL, -- array of column orders
+ NULL_ORDERS VARCHAR(128) NOT NULL, -- array of null orderings
IS_UNIQUE BOOLEAN NOT NULL,
IS_CLUSTERED BOOLEAN NOT NULL,
- IS_ASCENDING BOOLEAN NOT NULL,
- PRIMARY KEY (DB_ID, INDEX_NAME),
FOREIGN KEY (DB_ID) REFERENCES DATABASES_ (DB_ID) ON DELETE CASCADE,
FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE,
- UNIQUE INDEX IDX_DB_ID_NAME (DB_ID, INDEX_NAME),
- INDEX IDX_TID_COLUMN_NAME (TID, COLUMN_NAME)
+ UNIQUE INDEX IDX_DB_ID_NAME (DB_ID, INDEX_NAME)
)]]>
</tns:sql>
</tns:Object>
http://git-wip-us.apache.org/repos/asf/tajo/blob/410ca318/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml
index 34337fb..3a1bc0a 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml
@@ -95,20 +95,21 @@
<tns:Object order="6" type="table" name="INDEXES">
<tns:sql><![CDATA[
CREATE TABLE INDEXES (
+ INDEX_ID INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
DB_ID INT NOT NULL,
TID INT NOT NULL,
- INDEX_NAME VARCHAR(128) BINARY NOT NULL,
- COLUMN_NAME VARCHAR(128) BINARY NOT NULL,
- DATA_TYPE VARCHAR(128) NOT NULL,
- INDEX_TYPE CHAR(32) NOT NULL,
+ INDEX_NAME VARCHAR(128) NOT NULL,
+ INDEX_TYPE CHAR(32) BINARY NOT NULL,
+ PATH VARCHAR(4096) NOT NULL,
+ COLUMN_NAMES VARCHAR(256) NOT NULL, -- array of column names
+ DATA_TYPES VARCHAR(128) NOT NULL, -- array of column types
+ ORDERS VARCHAR(128) NOT NULL, -- array of column orders
+ NULL_ORDERS VARCHAR(128) NOT NULL, -- array of null orderings
IS_UNIQUE BOOLEAN NOT NULL,
IS_CLUSTERED BOOLEAN NOT NULL,
- IS_ASCENDING BOOLEAN NOT NULL,
- PRIMARY KEY (DB_ID, INDEX_NAME),
FOREIGN KEY (DB_ID) REFERENCES DATABASES_ (DB_ID) ON DELETE CASCADE,
FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE,
- UNIQUE INDEX IDX_DB_ID_NAME (DB_ID, INDEX_NAME),
- INDEX IDX_TID_COLUMN_NAME (TID, COLUMN_NAME)
+ UNIQUE INDEX IDX_DB_ID_NAME (DB_ID, INDEX_NAME)
)]]>
</tns:sql>
</tns:Object>
http://git-wip-us.apache.org/repos/asf/tajo/blob/410ca318/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml
index b83d899..5b5d17d 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml
@@ -146,16 +146,19 @@
<tns:Object order="14" type="table" name="INDEXES">
<tns:sql><![CDATA[
CREATE TABLE INDEXES (
+ INDEX_ID NUMBER(10) NOT NULL PRIMARY KEY,
DB_ID INT NOT NULL,
TID INT NOT NULL,
INDEX_NAME VARCHAR2(128) NOT NULL,
- COLUMN_NAME VARCHAR2(128) NOT NULL,
- DATA_TYPE VARCHAR2(128) NOT NULL,
INDEX_TYPE CHAR(32) NOT NULL,
+ PATH VARCHAR(4096) NOT NULL,
+ COLUMN_NAMES VARCHAR(256) NOT NULL, -- array of column names
+ DATA_TYPES VARCHAR(128) NOT NULL, -- array of column types
+ ORDERS VARCHAR(128) NOT NULL, -- array of column orders
+ NULL_ORDERS VARCHAR(128) NOT NULL, -- array of null orderings
IS_UNIQUE CHAR NOT NULL,
IS_CLUSTERED CHAR NOT NULL,
IS_ASCENDING CHAR NOT NULL,
- CONSTRAINT INDEXES_PKEY PRIMARY KEY (DB_ID, INDEX_NAME),
FOREIGN KEY (DB_ID) REFERENCES DATABASES_ (DB_ID) ON DELETE CASCADE,
FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE
)]]>
[2/4] tajo git commit: TAJO-1523 ClassSize should consider compressed
oops
Posted by ji...@apache.org.
TAJO-1523 ClassSize should consider compressed oops
Signed-off-by: Hyunsik Choi <hy...@apache.org>
Closes #508
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/2b4c1610
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/2b4c1610
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/2b4c1610
Branch: refs/heads/index_support
Commit: 2b4c1610d2e60dfc9c50fbfdac22e12870c17163
Parents: fab6390
Author: navis.ryu <na...@apache.org>
Authored: Sat Apr 4 16:56:05 2015 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu May 7 15:30:08 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/tajo/util/ClassSize.java | 18 ++++++++++---
.../java/org/apache/tajo/util/UnsafeUtil.java | 28 ++++++++++++++++++++
2 files changed, 42 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/2b4c1610/tajo-common/src/main/java/org/apache/tajo/util/ClassSize.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/ClassSize.java b/tajo-common/src/main/java/org/apache/tajo/util/ClassSize.java
index 708eae9..ab2fdf5 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/ClassSize.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/ClassSize.java
@@ -22,6 +22,7 @@ package org.apache.tajo.util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import sun.misc.Unsafe;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
@@ -141,10 +142,19 @@ public class ClassSize {
*/
static {
//Default value is set to 8, covering the case when arcModel is unknown
- if (is32BitJVM()) {
- REFERENCE = 4;
- } else {
- REFERENCE = 8;
+ switch (UnsafeUtil.ADDRESS_MODE) {
+ case MEM_32BIT:
+ REFERENCE = 4;
+ break;
+ case MEM_64BIT:
+ REFERENCE = 8;
+ break;
+ case MEM_64BIT_COMPRESSED_OOPS:
+ REFERENCE = 4;
+ break;
+ default:
+ REFERENCE = is32BitJVM() ? 4 : 8;
+ break;
}
OBJECT = 2 * REFERENCE;
http://git-wip-us.apache.org/repos/asf/tajo/blob/2b4c1610/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java
index 9d6b9b3..ff6072e 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java
@@ -28,6 +28,21 @@ import java.nio.ByteBuffer;
public class UnsafeUtil {
public static final Unsafe unsafe;
+ // copied from
+ // http://stackoverflow.com/questions/52353/in-java-what-is-the-best-way-to-determine-the-size-of-an-object
+ public static enum AddressMode {
+ /** Unknown address mode. Size calculations may be unreliable. */
+ UNKNOWN,
+ /** 32-bit address mode using 32-bit references. */
+ MEM_32BIT,
+ /** 64-bit address mode using 64-bit references. */
+ MEM_64BIT,
+ /** 64-bit address mode using 32-bit compressed references. */
+ MEM_64BIT_COMPRESSED_OOPS
+ }
+
+ public static final AddressMode ADDRESS_MODE;
+
// offsets
public static final int ARRAY_BOOLEAN_BASE_OFFSET;
public static final int ARRAY_BYTE_BASE_OFFSET;
@@ -83,6 +98,19 @@ public class UnsafeUtil {
ARRAY_FLOAT_INDEX_SCALE = unsafe.arrayIndexScale(float[].class);
ARRAY_DOUBLE_INDEX_SCALE = unsafe.arrayIndexScale(double[].class);
ARRAY_OBJECT_INDEX_SCALE = unsafe.arrayIndexScale(Object[].class);
+
+ int addressSize = unsafe.addressSize();
+ int referenceSize = unsafe.arrayIndexScale(Object[].class);
+
+ if (addressSize == 4) {
+ ADDRESS_MODE = AddressMode.MEM_32BIT;
+ } else if (addressSize == 8 && referenceSize == 8) {
+ ADDRESS_MODE = AddressMode.MEM_64BIT;
+ } else if (addressSize == 8 && referenceSize == 4) {
+ ADDRESS_MODE = AddressMode.MEM_64BIT_COMPRESSED_OOPS;
+ } else {
+ ADDRESS_MODE = AddressMode.UNKNOWN;
+ }
}
public static int alignedSize(int size) {
[3/4] tajo git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/tajo into index_support
Posted by ji...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/5096081a
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/5096081a
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/5096081a
Branch: refs/heads/index_support
Commit: 5096081a3561c477df75f16f3041441e9437ef71
Parents: 42bcf2d 2b4c161
Author: Jihoon Son <ji...@apache.org>
Authored: Fri May 8 14:53:14 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Fri May 8 14:53:14 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../java/org/apache/tajo/util/ClassSize.java | 18 +++-
.../java/org/apache/tajo/util/NumberUtil.java | 54 ++++++++++++
.../java/org/apache/tajo/util/UnsafeUtil.java | 28 +++++++
.../apache/tajo/querymaster/Repartitioner.java | 18 ++--
.../java/org/apache/tajo/querymaster/Task.java | 56 ++++++-------
.../tajo/worker/ExecutionBlockContext.java | 32 ++-----
.../src/main/proto/TajoWorkerProtocol.proto | 16 +---
.../apache/tajo/master/TestRepartitioner.java | 77 +++++++----------
.../tajo/querymaster/TestIntermediateEntry.java | 24 +++---
.../tajo/storage/HashShuffleAppender.java | 87 ++++++++++++++------
.../storage/HashShuffleAppenderManager.java | 12 +--
12 files changed, 247 insertions(+), 178 deletions(-)
----------------------------------------------------------------------