You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/12/24 10:18:25 UTC
[2/2] tajo git commit: TAJO-1950: Query master uses too much memory
during range shuffle.
TAJO-1950: Query master uses too much memory during range shuffle.
Closes #884
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1f9ae1da
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1f9ae1da
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1f9ae1da
Branch: refs/heads/master
Commit: 1f9ae1da0424731567cea18e975c47d4479b0ae9
Parents: e8ee7f2
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Dec 24 18:17:56 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Dec 24 18:17:56 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../java/org/apache/tajo/conf/TajoConf.java | 4 +
.../apache/tajo/ha/TestHAServiceHDFSImpl.java | 4 +-
.../apache/tajo/master/TestRepartitioner.java | 81 ++---
.../org/apache/tajo/worker/TestFetcher.java | 2 +-
.../apache/tajo/engine/query/TaskRequest.java | 6 +-
.../tajo/engine/query/TaskRequestImpl.java | 13 +-
.../org/apache/tajo/engine/utils/TupleUtil.java | 32 --
.../tajo/querymaster/DefaultTaskScheduler.java | 13 +-
.../tajo/querymaster/FetchScheduleEvent.java | 7 +-
.../apache/tajo/querymaster/Repartitioner.java | 206 ++++++++-----
.../java/org/apache/tajo/querymaster/Stage.java | 3 +-
.../java/org/apache/tajo/querymaster/Task.java | 48 ++-
.../tajo/worker/ExecutionBlockContext.java | 68 ++++-
.../java/org/apache/tajo/worker/FetchImpl.java | 140 ++++-----
.../java/org/apache/tajo/worker/Fetcher.java | 45 ++-
.../java/org/apache/tajo/worker/TaskImpl.java | 174 ++++++-----
tajo-core/src/main/proto/ResourceProtos.proto | 14 +-
.../src/main/resources/webapps/worker/task.jsp | 17 +-
.../tajo/pullserver/HttpDataServerHandler.java | 6 +-
.../tajo/pullserver/TajoPullServerService.java | 302 ++++++++++++++-----
.../apache/tajo/storage/index/bst/BSTIndex.java | 54 +++-
22 files changed, 811 insertions(+), 430 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 2edf9d7..c02bfc7 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,8 @@ Release 0.12.0 - unreleased
IMPROVEMENT
+ TAJO-1950: Query master uses too much memory during range shuffle. (jihoon)
+
TAJO-1858: Aligning error message in execute query page of web UI is needed.
(Byunghwa Yun via jihoon)
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 9ab3dfa..29cf9ee 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -212,6 +212,10 @@ public class TajoConf extends Configuration {
// Shuffle Configuration --------------------------------------------------
PULLSERVER_PORT("tajo.pullserver.port", 0, Validators.range("0", "65535")),
+ PULLSERVER_CACHE_SIZE("tajo.pullserver.index-cache.size", 10000, Validators.min("1")),
+ PULLSERVER_CACHE_TIMEOUT("tajo.pullserver.index-cache.timeout-min", 5, Validators.min("1")),
+ PULLSERVER_FETCH_URL_MAX_LENGTH("tajo.pullserver.fetch-url.max-length", StorageUnit.KB,
+ Validators.min("1")),
SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false, Validators.bool()),
SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", BuiltinStorages.RAW, Validators.javaString()),
SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num",
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java b/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
index 279fce7..81eeb1f 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
@@ -88,7 +88,9 @@ public class TestHAServiceHDFSImpl {
assertEquals(2, fs.listStatus(activePath).length);
assertEquals(0, fs.listStatus(backupPath).length);
} finally {
- backupMaster.stop();
+ if (backupMaster != null) {
+ backupMaster.stop();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index b0a3a17..c260ab6 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -29,13 +29,15 @@ import org.apache.tajo.TestTajoIds;
import org.apache.tajo.querymaster.Repartitioner;
import org.apache.tajo.querymaster.Task;
import org.apache.tajo.querymaster.Task.IntermediateEntry;
+import org.apache.tajo.querymaster.Task.PullHost;
+import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.Pair;
-import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.FetchImpl;
import org.junit.Test;
import java.net.URI;
import java.util.*;
+import java.util.stream.Collectors;
import static junit.framework.Assert.assertEquals;
import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
@@ -70,11 +72,9 @@ public class TestRepartitioner {
new HashMap<>();
for (Map.Entry<Integer, List<IntermediateEntry>> eachEntry: intermediateEntries.entrySet()) {
- FetchImpl fetch = new FetchImpl(new Task.PullHost(hostName, port), ShuffleType.HASH_SHUFFLE,
+ FetchImpl fetch = new FetchImpl(sid.toString(), new Task.PullHost(hostName, port), ShuffleType.HASH_SHUFFLE,
sid, eachEntry.getKey(), eachEntry.getValue());
- fetch.setName(sid.toString());
-
FetchProto proto = fetch.getProto();
fetch = new FetchImpl(proto);
assertEquals(proto, fetch.getProto());
@@ -84,7 +84,7 @@ public class TestRepartitioner {
hashEntries.put(eachEntry.getKey(), ebEntries);
- List<URI> uris = fetch.getURIs();
+ List<URI> uris = Repartitioner.createFullURIs(2 * StorageUnit.KB, fetch.getProto());
assertEquals(1, uris.size()); //In Hash Suffle, Fetcher return only one URI per partition.
URI uri = uris.get(0);
@@ -119,7 +119,7 @@ public class TestRepartitioner {
ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
FetchImpl [] fetches = new FetchImpl[12];
for (int i = 0; i < 12; i++) {
- fetches[i] = new FetchImpl(new Task.PullHost("localhost", 10000 + i), HASH_SHUFFLE, ebId, i / 2);
+ fetches[i] = new FetchImpl(tableName, new Task.PullHost("localhost", 10000 + i), HASH_SHUFFLE, ebId, i / 2);
}
int [] VOLUMES = {100, 80, 70, 30, 10, 5};
@@ -128,37 +128,40 @@ public class TestRepartitioner {
fetchGroups.put(i, new FetchGroupMeta(VOLUMES[i / 2], fetches[i]).addFetche(fetches[i + 1]));
}
- Pair<Long [], Map<String, List<FetchImpl>>[]> results;
+ FetchProto[] expectedProtos = new FetchProto[fetches.length];
+ expectedProtos = Arrays.stream(fetches).map(fetch -> fetch.getProto()).collect(Collectors.toList())
+ .toArray(expectedProtos);
+ Pair<Long [], Map<String, List<FetchProto>>[]> results;
results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 1);
long expected [] = {100 + 80 + 70 + 30 + 10 + 5};
assertFetchVolumes(expected, results.getFirst());
- assertFetchImpl(fetches, results.getSecond());
+ assertFetchProto(expectedProtos, results.getSecond());
results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 2);
long expected0 [] = {140, 155};
assertFetchVolumes(expected0, results.getFirst());
- assertFetchImpl(fetches, results.getSecond());
+ assertFetchProto(expectedProtos, results.getSecond());
results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 3);
long expected1 [] = {100, 95, 100};
assertFetchVolumes(expected1, results.getFirst());
- assertFetchImpl(fetches, results.getSecond());
+ assertFetchProto(expectedProtos, results.getSecond());
results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 4);
long expected2 [] = {100, 80, 70, 45};
assertFetchVolumes(expected2, results.getFirst());
- assertFetchImpl(fetches, results.getSecond());
+ assertFetchProto(expectedProtos, results.getSecond());
results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 5);
long expected3 [] = {100, 80, 70, 30, 15};
assertFetchVolumes(expected3, results.getFirst());
- assertFetchImpl(fetches, results.getSecond());
+ assertFetchProto(expectedProtos, results.getSecond());
results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 6);
long expected4 [] = {100, 80, 70, 30, 10, 5};
assertFetchVolumes(expected4, results.getFirst());
- assertFetchImpl(fetches, results.getSecond());
+ assertFetchProto(expectedProtos, results.getSecond());
}
private static void assertFetchVolumes(long [] expected, Long [] results) {
@@ -191,7 +194,8 @@ public class TestRepartitioner {
}
long splitVolume = 128 * 1024 * 1024;
- List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
+ ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+ List<List<FetchProto>> fetches = Repartitioner.splitOrMergeIntermediates("name", ebId, intermediateEntries,
splitVolume, 10 * 1024 * 1024);
assertEquals(6, fetches.size());
@@ -199,10 +203,10 @@ public class TestRepartitioner {
int index = 0;
int numZeroPosFetcher = 0;
long totalLength = 0;
- for (List<FetchImpl> eachFetchList: fetches) {
+ for (List<FetchProto> eachFetchList: fetches) {
totalInterms += eachFetchList.size();
long eachFetchVolume = 0;
- for (FetchImpl eachFetch: eachFetchList) {
+ for (FetchProto eachFetch: eachFetchList) {
eachFetchVolume += eachFetch.getLength();
if (eachFetch.getOffset() == 0) {
numZeroPosFetcher++;
@@ -248,8 +252,9 @@ public class TestRepartitioner {
intermediateEntries.add(interm);
}
+ ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
long splitVolume = 128 * 1024 * 1024;
- List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
+ List<List<FetchProto>> fetches = Repartitioner.splitOrMergeIntermediates("name", ebId, intermediateEntries,
splitVolume, 10 * 1024 * 1024);
assertEquals(32, fetches.size());
@@ -258,15 +263,15 @@ public class TestRepartitioner {
long totalLength = 0;
Set<String> uniqPullHost = new HashSet<>();
- for (List<FetchImpl> eachFetchList: fetches) {
+ for (List<FetchProto> eachFetchList: fetches) {
long length = 0;
- for (FetchImpl eachFetch: eachFetchList) {
+ for (FetchProto eachFetch: eachFetchList) {
if (eachFetch.getOffset() == 0) {
numZeroPosFetcher++;
}
totalLength += eachFetch.getLength();
length += eachFetch.getLength();
- uniqPullHost.add(eachFetch.getPullHost().toString());
+ uniqPullHost.add(new PullHost(eachFetch.getHost(), eachFetch.getPort()).toString());
}
assertTrue(length + " should be smaller than splitVolume", length < splitVolume);
if (index < fetches.size() - 1) {
@@ -378,7 +383,8 @@ public class TestRepartitioner {
}
long splitVolume = 256 * 1024 * 1024;
- List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, entries, splitVolume,
+ ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+ List<List<FetchProto>> fetches = Repartitioner.splitOrMergeIntermediates("name", ebId, entries, splitVolume,
10 * 1024 * 1024);
@@ -393,13 +399,13 @@ public class TestRepartitioner {
{728355084,121760359},
};
int index = 0;
- for (List<FetchImpl> eachFetchList: fetches) {
+ for (List<FetchProto> eachFetchList: fetches) {
if (index == 3) {
assertEquals(2, eachFetchList.size());
} else {
assertEquals(1, eachFetchList.size());
}
- for (FetchImpl eachFetch: eachFetchList) {
+ for (FetchProto eachFetch: eachFetchList) {
assertEquals(expected[index][0], eachFetch.getOffset());
assertEquals(expected[index][1], eachFetch.getLength());
index++;
@@ -438,13 +444,14 @@ public class TestRepartitioner {
}
long splitVolume = 128 * 1024 * 1024;
- List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
+ ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+ List<List<FetchProto>> fetches = Repartitioner.splitOrMergeIntermediates("name", ebId, intermediateEntries,
splitVolume, 10 * 1024 * 1024);
assertEquals(32, fetches.size());
int expectedSize = 0;
- Set<FetchImpl> fetchSet = new HashSet<>();
- for(List<FetchImpl> list : fetches){
+ Set<FetchProto> fetchSet = new HashSet<>();
+ for(List<FetchProto> list : fetches){
expectedSize += list.size();
fetchSet.addAll(list);
}
@@ -456,15 +463,15 @@ public class TestRepartitioner {
long totalLength = 0;
Set<String> uniqPullHost = new HashSet<>();
- for (List<FetchImpl> eachFetchList: fetches) {
+ for (List<FetchProto> eachFetchList: fetches) {
long length = 0;
- for (FetchImpl eachFetch: eachFetchList) {
+ for (FetchProto eachFetch: eachFetchList) {
if (eachFetch.getOffset() == 0) {
numZeroPosFetcher++;
}
totalLength += eachFetch.getLength();
length += eachFetch.getLength();
- uniqPullHost.add(eachFetch.getPullHost().toString());
+ uniqPullHost.add(new PullHost(eachFetch.getHost(), eachFetch.getPort()).toString());
}
assertTrue(length + " should be smaller than splitVolume", length < splitVolume);
if (index < fetches.size() - 1) {
@@ -482,25 +489,25 @@ public class TestRepartitioner {
ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
Task.PullHost pullHost = new Task.PullHost("localhost", 0);
- FetchImpl expected = new FetchImpl(pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1);
- FetchImpl fetch2 = new FetchImpl(pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1);
+ FetchImpl expected = new FetchImpl("name", pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1);
+ FetchImpl fetch2 = new FetchImpl("name", pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1);
assertEquals(expected, fetch2);
fetch2.setOffset(5);
fetch2.setLength(10);
assertNotEquals(expected, fetch2);
}
- private static void assertFetchImpl(FetchImpl [] expected, Map<String, List<FetchImpl>>[] result) {
- Set<FetchImpl> expectedURLs = Sets.newHashSet();
+ private static void assertFetchProto(FetchProto [] expected, Map<String, List<FetchProto>>[] result) {
+ Set<FetchProto> expectedURLs = Sets.newHashSet();
- for (FetchImpl f : expected) {
+ for (FetchProto f : expected) {
expectedURLs.add(f);
}
- Set<FetchImpl> resultURLs = Sets.newHashSet();
+ Set<FetchProto> resultURLs = Sets.newHashSet();
- for (Map<String, List<FetchImpl>> e : result) {
- for (List<FetchImpl> list : e.values()) {
+ for (Map<String, List<FetchProto>> e : result) {
+ for (List<FetchProto> list : e.values()) {
resultURLs.addAll(list);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
index a91fc30..dfc37b0 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -90,7 +90,7 @@ public class TestFetcher {
FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
storeChunk.setFromRemote(true);
final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
- FileChunk chunk = fetcher.get();
+ FileChunk chunk = fetcher.get().get(0);
assertNotNull(chunk);
assertNotNull(chunk.getFile());
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
index 48d4780..ef4ff60 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
@@ -21,6 +21,7 @@
*/
package org.apache.tajo.engine.query;
+import org.apache.tajo.ResourceProtos.FetchProto;
import org.apache.tajo.ResourceProtos.TaskRequestProto;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -29,7 +30,6 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.plan.serder.PlanProto;
-import org.apache.tajo.worker.FetchImpl;
import java.util.List;
@@ -39,8 +39,8 @@ public interface TaskRequest extends ProtoObject<TaskRequestProto> {
List<CatalogProtos.FragmentProto> getFragments();
PlanProto.LogicalNodeTree getPlan();
void setInterQuery();
- void addFetch(String name, FetchImpl fetch);
- List<FetchImpl> getFetches();
+ void addFetch(FetchProto fetch);
+ List<FetchProto> getFetches();
QueryContext getQueryContext(TajoConf conf);
DataChannel getDataChannel();
Enforcer getEnforcer();
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
index 7b52dab..fc7556c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
@@ -42,7 +42,7 @@ public class TaskRequestImpl implements TaskRequest {
private boolean clusteredOutput;
private PlanProto.LogicalNodeTree plan; // logical node
private Boolean interQuery;
- private List<FetchImpl> fetches;
+ private List<FetchProto> fetches;
private QueryContext queryContext;
private DataChannel dataChannel;
private Enforcer enforcer;
@@ -157,10 +157,10 @@ public class TaskRequestImpl implements TaskRequest {
this.interQuery = true;
}
- public void addFetch(String name, FetchImpl fetch) {
+ @Override
+ public void addFetch(FetchProto fetch) {
maybeInitBuilder();
initFetches();
- fetch.setName(name);
fetches.add(fetch);
}
@@ -212,7 +212,8 @@ public class TaskRequestImpl implements TaskRequest {
return this.enforcer;
}
- public List<FetchImpl> getFetches() {
+ @Override
+ public List<FetchProto> getFetches() {
initFetches();
return this.fetches;
@@ -225,7 +226,7 @@ public class TaskRequestImpl implements TaskRequest {
TaskRequestProtoOrBuilder p = viaProto ? proto : builder;
this.fetches = new ArrayList<>();
for(FetchProto fetch : p.getFetchesList()) {
- fetches.add(new FetchImpl(fetch));
+ fetches.add(fetch);
}
}
@@ -259,7 +260,7 @@ public class TaskRequestImpl implements TaskRequest {
}
if (this.fetches != null) {
for (int i = 0; i < fetches.size(); i++) {
- builder.addFetches(fetches.get(i).getProto());
+ builder.addFetches(fetches.get(i));
}
}
if (this.queryMasterHostAndPort != null) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
index bae04c8..31c23f4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -20,7 +20,6 @@ package org.apache.tajo.engine.utils;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
-import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.catalog.Column;
@@ -29,47 +28,16 @@ import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.catalog.statistics.ColumnStats;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleRange;
import org.apache.tajo.storage.VTuple;
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
import java.util.List;
import java.util.Map;
public class TupleUtil {
private static final Log LOG = LogFactory.getLog(TupleUtil.class);
- public static String rangeToQuery(Schema schema, TupleRange range, boolean last)
- throws UnsupportedEncodingException {
- return rangeToQuery(range, last, RowStoreUtil.createEncoder(schema));
- }
-
- public static String rangeToQuery(TupleRange range, boolean last, RowStoreEncoder encoder)
- throws UnsupportedEncodingException {
- StringBuilder sb = new StringBuilder();
- byte [] firstKeyBytes = encoder.toBytes(range.getStart());
- byte [] endKeyBytes = encoder.toBytes(range.getEnd());
-
- String firstKeyBase64 = new String(Base64.encodeBase64(firstKeyBytes));
- String lastKeyBase64 = new String(Base64.encodeBase64(endKeyBytes));
-
- sb.append("start=")
- .append(URLEncoder.encode(firstKeyBase64, "utf-8"))
- .append("&")
- .append("end=")
- .append(URLEncoder.encode(lastKeyBase64, "utf-8"));
-
- if (last) {
- sb.append("&final=true");
- }
-
- return sb.toString();
- }
-
/**
* if max value is null, set ranges[last]
* @param sortSpecs
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
index f1c0f62..26dc103 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
@@ -52,7 +52,6 @@ import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.RpcParameterFactory;
import org.apache.tajo.util.TUtil;
-import org.apache.tajo.worker.FetchImpl;
import java.net.InetSocketAddress;
import java.util.*;
@@ -233,11 +232,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
}
} else if (event instanceof FetchScheduleEvent) {
FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
- Map<String, List<FetchImpl>> fetches = castEvent.getFetches();
+ Map<String, List<FetchProto>> fetches = castEvent.getFetches();
TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext();
Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++);
scheduledObjectNum++;
- for (Entry<String, List<FetchImpl>> eachFetch : fetches.entrySet()) {
+ for (Entry<String, List<FetchProto>> eachFetch : fetches.entrySet()) {
task.addFetches(eachFetch.getKey(), eachFetch.getValue());
task.addFragment(fragmentsForNonLeafTask[0], true);
if (fragmentsForNonLeafTask[1] != null) {
@@ -983,11 +982,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
taskAssign.setInterQuery();
}
- for(Map.Entry<String, Set<FetchImpl>> entry: task.getFetchMap().entrySet()) {
- Collection<FetchImpl> fetches = entry.getValue();
+ for(Map.Entry<String, Set<FetchProto>> entry: task.getFetchMap().entrySet()) {
+ Collection<FetchProto> fetches = entry.getValue();
if (fetches != null) {
- for (FetchImpl fetch : fetches) {
- taskAssign.addFetch(entry.getKey(), fetch);
+ for (FetchProto fetch : fetches) {
+ taskAssign.addFetch(fetch);
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
index 5fe2f80..e4e63b4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
@@ -19,6 +19,7 @@
package org.apache.tajo.querymaster;
import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ResourceProtos.FetchProto;
import org.apache.tajo.master.event.TaskSchedulerEvent;
import org.apache.tajo.worker.FetchImpl;
@@ -26,15 +27,15 @@ import java.util.List;
import java.util.Map;
public class FetchScheduleEvent extends TaskSchedulerEvent {
- private final Map<String, List<FetchImpl>> fetches;
+ private final Map<String, List<FetchProto>> fetches; // map of table name and fetch list
public FetchScheduleEvent(final EventType eventType, final ExecutionBlockId blockId,
- final Map<String, List<FetchImpl>> fetches) {
+ final Map<String, List<FetchProto>> fetches) {
super(eventType, blockId);
this.fetches = fetches;
}
- public Map<String, List<FetchImpl>> getFetches() {
+ public Map<String, List<FetchProto>> getFetches() {
return fetches;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index e64cd51..bd8311f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -25,8 +25,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ResourceProtos.FetchProto;
import org.apache.tajo.SessionVars;
import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.annotation.NotNull;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.statistics.StatisticsUtil;
import org.apache.tajo.catalog.statistics.TableStats;
@@ -48,7 +50,9 @@ import org.apache.tajo.plan.logical.SortNode.SortPurpose;
import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.querymaster.Task.IntermediateEntry;
+import org.apache.tajo.querymaster.Task.PullHost;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
@@ -57,13 +61,17 @@ import org.apache.tajo.util.Pair;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.TajoIdUtils;
import org.apache.tajo.worker.FetchImpl;
+import org.apache.tajo.worker.FetchImpl.RangeParam;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.math.BigInteger;
import java.net.URI;
+import java.net.URLEncoder;
import java.util.*;
import java.util.Map.Entry;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.*;
@@ -75,7 +83,6 @@ import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.*;
public class Repartitioner {
private static final Log LOG = LogFactory.getLog(Repartitioner.class);
- private final static int HTTP_REQUEST_MAXIMUM_LENGTH = 1900;
private final static String UNKNOWN_HOST = "unknown";
public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, Stage stage)
@@ -546,12 +553,13 @@ public class Repartitioner {
private static void addJoinShuffle(Stage stage, int partitionId,
Map<ExecutionBlockId, List<IntermediateEntry>> grouppedPartitions) {
- Map<String, List<FetchImpl>> fetches = new HashMap<>();
+ Map<String, List<FetchProto>> fetches = new HashMap<>();
for (ExecutionBlock execBlock : stage.getMasterPlan().getChilds(stage.getId())) {
if (grouppedPartitions.containsKey(execBlock.getId())) {
- Collection<FetchImpl> requests = mergeShuffleRequest(partitionId, HASH_SHUFFLE,
+ String name = execBlock.getId().toString();
+ List<FetchProto> requests = mergeShuffleRequest(name, partitionId, HASH_SHUFFLE,
grouppedPartitions.get(execBlock.getId()));
- fetches.put(execBlock.getId().toString(), Lists.newArrayList(requests));
+ fetches.put(name, requests);
}
}
@@ -568,9 +576,10 @@ public class Repartitioner {
*
* @return key: pullserver's address, value: a list of requests
*/
- private static Collection<FetchImpl> mergeShuffleRequest(int partitionId,
- ShuffleType type,
- List<IntermediateEntry> partitions) {
+ private static List<FetchProto> mergeShuffleRequest(final String fetchName,
+ final int partitionId,
+ final ShuffleType type,
+ final List<IntermediateEntry> partitions) {
// ebId + pullhost -> FetchImmpl
Map<String, FetchImpl> mergedPartitions = new HashMap<>();
@@ -582,12 +591,15 @@ public class Repartitioner {
fetch.addPart(partition.getTaskId(), partition.getAttemptId());
} else {
// In some cases like union each IntermediateEntry has different EBID.
- FetchImpl fetch = new FetchImpl(partition.getPullHost(), type, partition.getEbId(), partitionId);
+ FetchImpl fetch = new FetchImpl(fetchName, partition.getPullHost(), type, partition.getEbId(), partitionId);
fetch.addPart(partition.getTaskId(), partition.getAttemptId());
mergedPartitions.put(mergedKey, fetch);
}
}
- return mergedPartitions.values();
+
+ return mergedPartitions.values().stream()
+ .map(fetch -> fetch.getProto())
+ .collect(Collectors.toList());
}
public static void scheduleFragmentsForNonLeafTasks(TaskSchedulerContext schedulerContext,
@@ -699,44 +711,44 @@ public class Repartitioner {
new String[]{UNKNOWN_HOST});
Stage.scheduleFragment(stage, dummyFragment);
- List<FetchImpl> fetches = new ArrayList<>();
+ Map<Pair<PullHost, ExecutionBlockId>, FetchImpl> fetches = new HashMap<>();
List<ExecutionBlock> childBlocks = masterPlan.getChilds(stage.getId());
for (ExecutionBlock childBlock : childBlocks) {
Stage childExecSM = stage.getContext().getStage(childBlock.getId());
for (Task qu : childExecSM.getTasks()) {
for (IntermediateEntry p : qu.getIntermediateData()) {
- FetchImpl fetch = new FetchImpl(p.getPullHost(), RANGE_SHUFFLE, childBlock.getId(), 0);
- fetch.addPart(p.getTaskId(), p.getAttemptId());
- fetches.add(fetch);
+ Pair<PullHost, ExecutionBlockId> key = new Pair<>(p.getPullHost(), childBlock.getId());
+ if (fetches.containsKey(key)) {
+ fetches.get(key).addPart(p.getTaskId(), p.getAttemptId());
+ } else {
+ FetchImpl fetch = new FetchImpl(scan.getTableName(), p.getPullHost(), RANGE_SHUFFLE, childBlock.getId(), 0);
+ fetch.addPart(p.getTaskId(), p.getAttemptId());
+ fetches.put(key, fetch);
+ }
}
}
}
- SortedMap<TupleRange, Collection<FetchImpl>> map;
+ SortedMap<TupleRange, Collection<FetchProto>> map;
map = new TreeMap<>();
- Set<FetchImpl> fetchSet;
- try {
- RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(sortSchema);
- for (int i = 0; i < ranges.length; i++) {
- fetchSet = new HashSet<>();
- for (FetchImpl fetch: fetches) {
- String rangeParam =
- TupleUtil.rangeToQuery(ranges[i], i == (ranges.length - 1) , encoder);
- FetchImpl copy = null;
- try {
- copy = fetch.clone();
- } catch (CloneNotSupportedException e) {
- throw new RuntimeException(e);
- }
- copy.setRangeParams(rangeParam);
- fetchSet.add(copy);
+ Set<FetchProto> fetchSet;
+ RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(sortSchema);
+ for (int i = 0; i < ranges.length; i++) {
+ fetchSet = new HashSet<>();
+ RangeParam rangeParam = new RangeParam(ranges[i], i == (ranges.length - 1), encoder);
+ for (FetchImpl fetch : fetches.values()) {
+ FetchImpl copy = null;
+ try {
+ copy = fetch.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException(e);
}
- map.put(ranges[i], fetchSet);
+ copy.setRangeParams(rangeParam);
+ fetchSet.add(copy.getProto());
}
- } catch (UnsupportedEncodingException e) {
- LOG.error(e);
+ map.put(ranges[i], fetchSet);
}
scheduleFetchesByRoundRobin(stage, map, scan.getTableName(), determinedTaskNum);
@@ -744,20 +756,20 @@ public class Repartitioner {
schedulerContext.setEstimatedTaskNum(determinedTaskNum);
}
- public static void scheduleFetchesByRoundRobin(Stage stage, Map<?, Collection<FetchImpl>> partitions,
- String tableName, int num) {
+ public static void scheduleFetchesByRoundRobin(Stage stage, Map<?, Collection<FetchProto>> partitions,
+ String tableName, int num) {
int i;
- Map<String, List<FetchImpl>>[] fetchesArray = new Map[num];
+ Map<String, List<FetchProto>>[] fetchesArray = new Map[num];
for (i = 0; i < num; i++) {
fetchesArray[i] = new HashMap<>();
}
i = 0;
- for (Entry<?, Collection<FetchImpl>> entry : partitions.entrySet()) {
- Collection<FetchImpl> value = entry.getValue();
+ for (Entry<?, Collection<FetchProto>> entry : partitions.entrySet()) {
+ Collection<FetchProto> value = entry.getValue();
TUtil.putCollectionToNestedList(fetchesArray[i++], tableName, value);
if (i == num) i = 0;
}
- for (Map<String, List<FetchImpl>> eachFetches : fetchesArray) {
+ for (Map<String, List<FetchProto>> eachFetches : fetchesArray) {
Stage.scheduleFetches(stage, eachFetches);
}
}
@@ -785,6 +797,10 @@ public class Repartitioner {
return totalVolume;
}
+ public List<FetchProto> getFetchProtos() {
+ return fetchUrls.stream().map(fetch -> fetch.getProto()).collect(Collectors.toList());
+ }
+
}
public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan,
@@ -821,7 +837,7 @@ public class Repartitioner {
Map<Task.PullHost, List<IntermediateEntry>> hashedByHost = hashByHost(interm.getValue());
for (Entry<Task.PullHost, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
- FetchImpl fetch = new FetchImpl(e.getKey(), channel.getShuffleType(),
+ FetchImpl fetch = new FetchImpl(scan.getTableName(), e.getKey(), channel.getShuffleType(),
block.getId(), interm.getKey(), e.getValue());
long volumeSum = 0;
@@ -891,20 +907,16 @@ public class Repartitioner {
}
}
- public static Pair<Long [], Map<String, List<FetchImpl>>[]> makeEvenDistributedFetchImpl(
+ public static Pair<Long [], Map<String, List<FetchProto>>[]> makeEvenDistributedFetchImpl(
Map<Integer, FetchGroupMeta> partitions, String tableName, int num) {
// Sort fetchGroupMeta in a descending order of data volumes.
List<FetchGroupMeta> fetchGroupMetaList = Lists.newArrayList(partitions.values());
- Collections.sort(fetchGroupMetaList, new Comparator<FetchGroupMeta>() {
- @Override
- public int compare(FetchGroupMeta o1, FetchGroupMeta o2) {
- return o1.getVolume() < o2.getVolume() ? 1 : (o1.getVolume() > o2.getVolume() ? -1 : 0);
- }
- });
+ Collections.sort(fetchGroupMetaList, (o1, o2) ->
+ o1.getVolume() < o2.getVolume() ? 1 : (o1.getVolume() > o2.getVolume() ? -1 : 0));
// Initialize containers
- Map<String, List<FetchImpl>>[] fetchesArray = new Map[num];
+ Map<String, List<FetchProto>>[] fetchesArray = new Map[num];
Long [] assignedVolumes = new Long[num];
// initialization
for (int i = 0; i < num; i++) {
@@ -925,7 +937,7 @@ public class Repartitioner {
FetchGroupMeta fetchGroupMeta = iterator.next();
assignedVolumes[p] += fetchGroupMeta.getVolume();
- TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.fetchUrls);
+ TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.getFetchProtos());
p++;
}
@@ -933,13 +945,13 @@ public class Repartitioner {
while (p >= 0 && iterator.hasNext()) {
FetchGroupMeta fetchGroupMeta = iterator.next();
assignedVolumes[p] += fetchGroupMeta.getVolume();
- TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.fetchUrls);
+ TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.getFetchProtos());
// While the current one is smaller than next one, it adds additional fetches to current one.
while(iterator.hasNext() && (p > 0 && assignedVolumes[p - 1] > assignedVolumes[p])) {
FetchGroupMeta additionalFetchGroup = iterator.next();
assignedVolumes[p] += additionalFetchGroup.getVolume();
- TUtil.putCollectionToNestedList(fetchesArray[p], tableName, additionalFetchGroup.fetchUrls);
+ TUtil.putCollectionToNestedList(fetchesArray[p], tableName, additionalFetchGroup.getFetchProtos());
}
p--;
@@ -951,9 +963,9 @@ public class Repartitioner {
public static void scheduleFetchesByEvenDistributedVolumes(Stage stage, Map<Integer, FetchGroupMeta> partitions,
String tableName, int num) {
- Map<String, List<FetchImpl>>[] fetchsArray = makeEvenDistributedFetchImpl(partitions, tableName, num).getSecond();
+ Map<String, List<FetchProto>>[] fetchsArray = makeEvenDistributedFetchImpl(partitions, tableName, num).getSecond();
// Schedule FetchImpls
- for (Map<String, List<FetchImpl>> eachFetches : fetchsArray) {
+ for (Map<String, List<FetchProto>> eachFetches : fetchsArray) {
Stage.scheduleFetches(stage, eachFetches);
}
}
@@ -976,7 +988,7 @@ public class Repartitioner {
throw new RuntimeException("tajo.dist-query.table-partition.task-volume-mb should be great than " +
"tajo.shuffle.hash.appender.page.volumn-mb");
}
- List<List<FetchImpl>> fetches = new ArrayList<>();
+ List<List<FetchProto>> fetches = new ArrayList<>();
long totalIntermediateSize = 0L;
for (Entry<ExecutionBlockId, List<IntermediateEntry>> listEntry : intermediates.entrySet()) {
@@ -996,7 +1008,7 @@ public class Repartitioner {
// Grouping or splitting to fit $DIST_QUERY_TABLE_PARTITION_VOLUME size
for (List<IntermediateEntry> partitionEntries : partitionIntermMap.values()) {
- List<List<FetchImpl>> eachFetches = splitOrMergeIntermediates(listEntry.getKey(), partitionEntries,
+ List<List<FetchProto>> eachFetches = splitOrMergeIntermediates(tableName, listEntry.getKey(), partitionEntries,
splitVolume, pageSize);
if (eachFetches != null && !eachFetches.isEmpty()) {
fetches.addAll(eachFetches);
@@ -1007,8 +1019,8 @@ public class Repartitioner {
schedulerContext.setEstimatedTaskNum(fetches.size());
int i = 0;
- Map<String, List<FetchImpl>>[] fetchesArray = new Map[fetches.size()];
- for(List<FetchImpl> entry : fetches) {
+ Map<String, List<FetchProto>>[] fetchesArray = new Map[fetches.size()];
+ for(List<FetchProto> entry : fetches) {
fetchesArray[i] = new HashMap<>();
fetchesArray[i].put(tableName, entry);
@@ -1030,16 +1042,16 @@ public class Repartitioner {
* @param splitVolume
* @return
*/
- public static List<List<FetchImpl>> splitOrMergeIntermediates(
+ public static List<List<FetchProto>> splitOrMergeIntermediates(@NotNull String fetchName,
ExecutionBlockId ebId, List<IntermediateEntry> entries, long splitVolume, long pageSize) {
// Each List<FetchImpl> has splitVolume size.
- List<List<FetchImpl>> fetches = new ArrayList<>();
+ List<List<FetchProto>> fetches = new ArrayList<>();
Iterator<IntermediateEntry> iter = entries.iterator();
if (!iter.hasNext()) {
return null;
}
- List<FetchImpl> fetchListForSingleTask = new ArrayList<>();
+ List<FetchProto> fetchListForSingleTask = new ArrayList<>();
long fetchListVolume = 0;
while (iter.hasNext()) {
@@ -1065,11 +1077,11 @@ public class Repartitioner {
fetchListForSingleTask = new ArrayList<>();
fetchListVolume = 0;
}
- FetchImpl fetch = new FetchImpl(currentInterm.getPullHost(), SCATTERED_HASH_SHUFFLE,
+ FetchImpl fetch = new FetchImpl(fetchName, currentInterm.getPullHost(), SCATTERED_HASH_SHUFFLE,
ebId, currentInterm.getPartId(), TUtil.newList(currentInterm));
fetch.setOffset(eachSplit.getFirst());
fetch.setLength(eachSplit.getSecond());
- fetchListForSingleTask.add(fetch);
+ fetchListForSingleTask.add(fetch.getProto());
fetchListVolume += eachSplit.getSecond();
}
}
@@ -1079,19 +1091,56 @@ public class Repartitioner {
return fetches;
}
- public static List<URI> createFetchURL(FetchImpl fetch, boolean includeParts) {
+ /**
+ * Get the pull server URIs.
+ */
+ public static List<URI> createFullURIs(int maxUrlLength, FetchProto fetch) {
+ return createFetchURL(maxUrlLength, fetch, true);
+ }
+
+ /**
+ * Get the pull server URIs without repeated parameters.
+ */
+ public static List<URI> createSimpleURIs(int maxUrlLength, FetchProto fetch) {
+ return createFetchURL(maxUrlLength, fetch, false);
+ }
+
+ private static String getRangeParam(FetchProto proto) {
+ StringBuilder sb = new StringBuilder();
+ String firstKeyBase64 = new String(org.apache.commons.codec.binary.Base64.encodeBase64(proto.getRangeStart().toByteArray()));
+ String lastKeyBase64 = new String(org.apache.commons.codec.binary.Base64.encodeBase64(proto.getRangeEnd().toByteArray()));
+
+ try {
+ sb.append("start=")
+ .append(URLEncoder.encode(firstKeyBase64, "utf-8"))
+ .append("&")
+ .append("end=")
+ .append(URLEncoder.encode(lastKeyBase64, "utf-8"));
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (proto.getRangeLastInclusive()) {
+ sb.append("&final=true");
+ }
+
+ return sb.toString();
+ }
+
+ public static List<URI> createFetchURL(int maxUrlLength, FetchProto fetch, boolean includeParts) {
String scheme = "http://";
StringBuilder urlPrefix = new StringBuilder(scheme);
- urlPrefix.append(fetch.getPullHost().getHost()).append(":").append(fetch.getPullHost().getPort()).append("/?")
- .append("qid=").append(fetch.getExecutionBlockId().getQueryId().toString())
- .append("&sid=").append(fetch.getExecutionBlockId().getId())
+ ExecutionBlockId ebId = new ExecutionBlockId(fetch.getExecutionBlockId());
+ urlPrefix.append(fetch.getHost()).append(":").append(fetch.getPort()).append("/?")
+ .append("qid=").append(ebId.getQueryId().toString())
+ .append("&sid=").append(ebId.getId())
.append("&p=").append(fetch.getPartitionId())
.append("&type=");
if (fetch.getType() == HASH_SHUFFLE) {
urlPrefix.append("h");
} else if (fetch.getType() == RANGE_SHUFFLE) {
- urlPrefix.append("r").append("&").append(fetch.getRangeParams());
+ urlPrefix.append("r").append("&").append(getRangeParam(fetch));
} else if (fetch.getType() == SCATTERED_HASH_SHUFFLE) {
urlPrefix.append("s");
}
@@ -1105,17 +1154,26 @@ public class Repartitioner {
if (fetch.getType() == HASH_SHUFFLE || fetch.getType() == SCATTERED_HASH_SHUFFLE) {
fetchURLs.add(URI.create(urlPrefix.toString()));
} else {
+ urlPrefix.append("&ta=");
// If the get request is longer than 2000 characters,
// the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
// Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
// The below code transforms a long request to multiple requests.
List<String> taskIdsParams = new ArrayList<>();
StringBuilder taskIdListBuilder = new StringBuilder();
- List<Integer> taskIds = fetch.getTaskIds();
- List<Integer> attemptIds = fetch.getAttemptIds();
+
+ final List<Integer> taskIds = fetch.getTaskIdList();
+ final List<Integer> attemptIds = fetch.getAttemptIdList();
+
+ // Sort task ids to increase cache hit in pull server
+ final List<Pair<Integer, Integer>> taskAndAttemptIds = IntStream.range(0, taskIds.size())
+ .mapToObj(i -> new Pair<>(taskIds.get(i), attemptIds.get(i)))
+ .sorted((p1, p2) -> p1.getFirst() - p2.getFirst())
+ .collect(Collectors.toList());
+
boolean first = true;
- for (int i = 0; i < taskIds.size(); i++) {
+ for (int i = 0; i < taskAndAttemptIds.size(); i++) {
StringBuilder taskAttemptId = new StringBuilder();
if (!first) { // when comma is added?
@@ -1124,17 +1182,16 @@ public class Repartitioner {
first = false;
}
- int taskId = taskIds.get(i);
+ int taskId = taskAndAttemptIds.get(i).getFirst();
if (taskId < 0) {
// In the case of hash shuffle each partition has single shuffle file per worker.
// TODO If file is large, consider multiple fetching(shuffle file can be split)
continue;
}
- int attemptId = attemptIds.get(i);
+ int attemptId = taskAndAttemptIds.get(i).getSecond();
taskAttemptId.append(taskId).append("_").append(attemptId);
- if (taskIdListBuilder.length() + taskAttemptId.length()
- > HTTP_REQUEST_MAXIMUM_LENGTH) {
+ if (urlPrefix.length() + taskIdListBuilder.length() > maxUrlLength) {
taskIdsParams.add(taskIdListBuilder.toString());
taskIdListBuilder = new StringBuilder(taskId + "_" + attemptId);
} else {
@@ -1145,7 +1202,6 @@ public class Repartitioner {
if (taskIdListBuilder.length() > 0) {
taskIdsParams.add(taskIdListBuilder.toString());
}
- urlPrefix.append("&ta=");
for (String param : taskIdsParams) {
fetchURLs.add(URI.create(urlPrefix + param));
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 254df64..5f050bf 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -65,7 +65,6 @@ import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.RpcParameterFactory;
import org.apache.tajo.util.history.StageHistory;
import org.apache.tajo.util.history.TaskHistory;
-import org.apache.tajo.worker.FetchImpl;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -1189,7 +1188,7 @@ public class Stage implements EventHandler<StageEvent> {
stage.getId(), leftFragment, rightFragments));
}
- public static void scheduleFetches(Stage stage, Map<String, List<FetchImpl>> fetches) {
+ public static void scheduleFetches(Stage stage, Map<String, List<FetchProto>> fetches) {
stage.taskScheduler.handle(new FetchScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
stage.getId(), fetches));
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
index 95a7170..9d038ac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
@@ -34,6 +34,7 @@ import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TaskId;
import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.master.TaskState;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.event.*;
@@ -46,7 +47,6 @@ import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.util.Pair;
import org.apache.tajo.util.TajoIdUtils;
import org.apache.tajo.util.history.TaskHistory;
-import org.apache.tajo.worker.FetchImpl;
import java.net.URI;
import java.util.*;
@@ -55,8 +55,8 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import static org.apache.tajo.ResourceProtos.*;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
public class Task implements EventHandler<TaskEvent> {
/** Class Logger */
@@ -70,7 +70,7 @@ public class Task implements EventHandler<TaskEvent> {
private List<ScanNode> scan;
private Map<String, Set<FragmentProto>> fragMap;
- private Map<String, Set<FetchImpl>> fetchMap;
+ private Map<String, Set<FetchProto>> fetchMap;
private int totalFragmentNum;
@@ -100,6 +100,8 @@ public class Task implements EventHandler<TaskEvent> {
private TaskHistory finalTaskHistory;
+ private final int maxUrlLength;
+
protected static final StateMachineFactory
<Task, TaskState, TaskEventType, TaskEvent> stateMachineFactory =
new StateMachineFactory <Task, TaskState, TaskEventType, TaskEvent>(TaskState.NEW)
@@ -207,6 +209,8 @@ public class Task implements EventHandler<TaskEvent> {
stateMachine = stateMachineFactory.make(this);
totalFragmentNum = 0;
+ maxUrlLength = conf.getInt(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.name(),
+ ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.defaultIntVal);
}
public boolean isLeafTask() {
@@ -282,9 +286,9 @@ public class Task implements EventHandler<TaskEvent> {
taskHistory.setFragments(fragmentList.toArray(new String[fragmentList.size()]));
List<String[]> fetchList = new ArrayList<>();
- for (Map.Entry<String, Set<FetchImpl>> e : getFetchMap().entrySet()) {
- for (FetchImpl f : e.getValue()) {
- for (URI uri : f.getSimpleURIs()){
+ for (Map.Entry<String, Set<FetchProto>> e : getFetchMap().entrySet()) {
+ for (FetchProto f : e.getValue()) {
+ for (URI uri : Repartitioner.createSimpleURIs(maxUrlLength, f)) {
fetchList.add(new String[] {e.getKey(), uri.toString()});
}
}
@@ -364,8 +368,8 @@ public class Task implements EventHandler<TaskEvent> {
return succeededWorker;
}
- public void addFetches(String tableId, Collection<FetchImpl> fetches) {
- Set<FetchImpl> fetchSet;
+ public void addFetches(String tableId, Collection<FetchProto> fetches) {
+ Set<FetchProto> fetchSet;
if (fetchMap.containsKey(tableId)) {
fetchSet = fetchMap.get(tableId);
} else {
@@ -375,7 +379,7 @@ public class Task implements EventHandler<TaskEvent> {
fetchMap.put(tableId, fetchSet);
}
- public void setFetches(Map<String, Set<FetchImpl>> fetches) {
+ public void setFetches(Map<String, Set<FetchProto>> fetches) {
this.fetchMap.clear();
this.fetchMap.putAll(fetches);
}
@@ -395,27 +399,15 @@ public class Task implements EventHandler<TaskEvent> {
public TaskId getId() {
return taskId;
}
-
- public Collection<FetchImpl> getFetchHosts(String tableId) {
- return fetchMap.get(tableId);
- }
-
- public Collection<Set<FetchImpl>> getFetches() {
+
+ public Collection<Set<FetchProto>> getFetches() {
return fetchMap.values();
}
- public Map<String, Set<FetchImpl>> getFetchMap() {
+ public Map<String, Set<FetchProto>> getFetchMap() {
return fetchMap;
}
-
- public Collection<FetchImpl> getFetch(ScanNode scan) {
- return this.fetchMap.get(scan.getTableName());
- }
-
- public ScanNode[] getScanNodes() {
- return this.scan.toArray(new ScanNode[scan.size()]);
- }
-
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
@@ -426,10 +418,10 @@ public class Task implements EventHandler<TaskEvent> {
builder.append(fragment).append(", ");
}
}
- for (Entry<String, Set<FetchImpl>> e : fetchMap.entrySet()) {
+ for (Entry<String, Set<FetchProto>> e : fetchMap.entrySet()) {
builder.append(e.getKey()).append(" : ");
- for (FetchImpl t : e.getValue()) {
- for (URI uri : t.getURIs()){
+ for (FetchProto t : e.getValue()) {
+ for (URI uri : Repartitioner.createFullURIs(maxUrlLength, t)){
builder.append(uri).append(" ");
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 65cb6ac..098567a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -20,6 +20,12 @@ package org.apache.tajo.worker;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.*;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.*;
+import io.netty.handler.codec.http.HttpHeaders.Names;
+import io.netty.handler.codec.http.HttpHeaders.Values;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -32,20 +38,20 @@ import org.apache.tajo.TajoProtos;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TaskId;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.plan.serder.PlanProto;
import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.rpc.AsyncRpcClient;
-import org.apache.tajo.rpc.CallFuture;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.*;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.util.Pair;
import org.apache.tajo.worker.event.ExecutionBlockErrorEvent;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -191,10 +197,64 @@ public class ExecutionBlockContext {
}
tasks.clear();
taskHistories.clear();
+
+ // Clear index cache in pull server
+ clearIndexCache();
+
resource.release();
RpcClientManager.cleanup(queryMasterClient);
}
+ /**
+ * Send a request to {@link TajoPullServerService} to clear index cache
+ */
+ private void clearIndexCache() {
+ // Avoid unnecessary cache clear request when the current eb is a leaf eb
+ if (executionBlockId.getId() > 1) {
+ Bootstrap bootstrap = new Bootstrap()
+ .group(NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER, 1))
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000)
+ .option(ChannelOption.TCP_NODELAY, true);
+ ChannelInitializer<Channel> initializer = new ChannelInitializer<Channel>() {
+ @Override
+ protected void initChannel(Channel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
+ pipeline.addLast("codec", new HttpClientCodec());
+ }
+ };
+ bootstrap.handler(initializer);
+
+ WorkerConnectionInfo connInfo = workerContext.getConnectionInfo();
+ ChannelFuture future = bootstrap.connect(new InetSocketAddress(connInfo.getHost(), connInfo.getPullServerPort()))
+ .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
+
+ try {
+ Channel channel = future.await().channel();
+ if (!future.isSuccess()) {
+ // Upon failure to connect to pull server, cache clear message is just ignored.
+ LOG.warn(future.cause());
+ return;
+ }
+
+ // Example of URI: /ebid=eb_1450063997899_0015_000002
+ ExecutionBlockId clearEbId = new ExecutionBlockId(executionBlockId.getQueryId(), executionBlockId.getId() - 1);
+ HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.DELETE, "ebid=" + clearEbId.toString());
+ request.headers().set(Names.HOST, connInfo.getHost());
+ request.headers().set(Names.CONNECTION, Values.CLOSE);
+ channel.writeAndFlush(request);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (future != null && future.channel().isOpen()) {
+ // Close the channel to exit.
+ future.channel().close();
+ }
+ }
+ }
+ }
+
public TajoConf getConf() {
return systemConf;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
index 7d2033c..b49d449 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
@@ -21,16 +21,21 @@ package org.apache.tajo.worker;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.ResourceProtos.FetchProto;
import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.querymaster.Repartitioner;
import org.apache.tajo.querymaster.Task;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
+import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.util.Pair;
import org.apache.tajo.util.TUtil;
-import java.net.URI;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
@@ -42,8 +47,8 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
private ShuffleType type; // hash or range partition method.
private ExecutionBlockId executionBlockId; // The executionBlock id
private int partitionId; // The hash partition id
- private String name; // The intermediate source name
- private String rangeParams; // optional, the http parameters of range partition. (e.g., start=xx&end=yy)
+ private final String name; // The intermediate source name
+ private RangeParam rangeParam; // optional, range parameter for range shuffle
private boolean hasNext = false; // optional, if true, has more taskIds
private List<Integer> taskIds; // repeated, the task ids
@@ -52,19 +57,48 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
private long offset = -1;
private long length = -1;
- public FetchImpl() {
- taskIds = new ArrayList<>();
- attemptIds = new ArrayList<>();
+ public static class RangeParam {
+ private byte[] start;
+ private byte[] end;
+ private boolean lastInclusive;
+
+ public RangeParam(TupleRange range, boolean lastInclusive, RowStoreEncoder encoder) {
+ this.start = encoder.toBytes(range.getStart());
+ this.end = encoder.toBytes(range.getEnd());
+ this.lastInclusive = lastInclusive;
+ }
+
+ public RangeParam(byte[] start, byte[] end, boolean lastInclusive) {
+ this.start = start;
+ this.end = end;
+ this.lastInclusive = lastInclusive;
+ }
+
+ public byte[] getStart() {
+ return start;
+ }
+
+ public byte[] getEnd() {
+ return end;
+ }
+
+ public boolean isLastInclusive() {
+ return lastInclusive;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(Arrays.hashCode(start), Arrays.hashCode(end), lastInclusive);
+ }
}
public FetchImpl(FetchProto proto) {
- this(new Task.PullHost(proto.getHost(), proto.getPort()),
+ this(proto.getName(),
+ new Task.PullHost(proto.getHost(), proto.getPort()),
proto.getType(),
new ExecutionBlockId(proto.getExecutionBlockId()),
proto.getPartitionId(),
- proto.getRangeParams(),
proto.getHasNext(),
- proto.getName(),
proto.getTaskIdList(), proto.getAttemptIdList());
if (proto.hasOffset()) {
@@ -74,31 +108,41 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
if (proto.hasLength()) {
this.length = proto.getLength();
}
+
+ if (proto.hasRangeStart()) {
+ this.rangeParam = new RangeParam(proto.getRangeStart().toByteArray(),
+ proto.getRangeEnd().toByteArray(), proto.getRangeLastInclusive());
+ }
}
- public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
+ public FetchImpl(String name, Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
int partitionId) {
- this(host, type, executionBlockId, partitionId, null, false, null,
- new ArrayList<>(), new ArrayList<>());
+ this(name, host, type, executionBlockId, partitionId, null, false, new ArrayList<>(), new ArrayList<>());
}
- public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
+ public FetchImpl(String name, Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
int partitionId, List<Task.IntermediateEntry> intermediateEntryList) {
- this(host, type, executionBlockId, partitionId, null, false, null,
+ this(name, host, type, executionBlockId, partitionId, null, false,
new ArrayList<>(), new ArrayList<>());
for (Task.IntermediateEntry entry : intermediateEntryList){
addPart(entry.getTaskId(), entry.getAttemptId());
}
}
- public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
- int partitionId, String rangeParams, boolean hasNext, String name,
+ public FetchImpl(String name, Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
+ int partitionId, boolean hasNext,
+ List<Integer> taskIds, List<Integer> attemptIds) {
+ this(name, host, type, executionBlockId, partitionId, null, hasNext, taskIds, attemptIds);
+ }
+
+ public FetchImpl(String name, Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
+ int partitionId, RangeParam rangeParam, boolean hasNext,
List<Integer> taskIds, List<Integer> attemptIds) {
this.host = host;
this.type = type;
this.executionBlockId = executionBlockId;
this.partitionId = partitionId;
- this.rangeParams = rangeParams;
+ this.rangeParam = rangeParam;
this.hasNext = hasNext;
this.name = name;
this.taskIds = taskIds;
@@ -107,7 +151,7 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
@Override
public int hashCode() {
- return Objects.hashCode(host, type, executionBlockId, partitionId, name, rangeParams,
+ return Objects.hashCode(host, type, executionBlockId, partitionId, name, rangeParam.hashCode(),
hasNext, taskIds, attemptIds, offset, length);
}
@@ -123,11 +167,14 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
builder.setHasNext(hasNext);
builder.setName(name);
- if (rangeParams != null && !rangeParams.isEmpty()) {
- builder.setRangeParams(rangeParams);
+ if (rangeParam != null) {
+ builder.setRangeStart(ByteString.copyFrom(rangeParam.getStart()));
+ builder.setRangeEnd(ByteString.copyFrom(rangeParam.getEnd()));
+ builder.setRangeLastInclusive(rangeParam.isLastInclusive());
}
Preconditions.checkArgument(taskIds.size() == attemptIds.size());
+
builder.addAllTaskId(taskIds);
builder.addAllAttemptId(attemptIds);
@@ -141,10 +188,6 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
this.attemptIds.add(attemptId);
}
- public Task.PullHost getPullHost() {
- return this.host;
- }
-
public ExecutionBlockId getExecutionBlockId() {
return executionBlockId;
}
@@ -153,20 +196,8 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
this.executionBlockId = executionBlockId;
}
- public int getPartitionId() {
- return partitionId;
- }
-
- public void setPartitionId(int partitionId) {
- this.partitionId = partitionId;
- }
-
- public String getRangeParams() {
- return rangeParams;
- }
-
- public void setRangeParams(String rangeParams) {
- this.rangeParams = rangeParams;
+ public void setRangeParams(RangeParam rangeParams) {
+ this.rangeParam = rangeParams;
}
public boolean hasNext() {
@@ -185,36 +216,10 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
this.type = type;
}
- /**
- * Get the pull server URIs.
- */
- public List<URI> getURIs(){
- return Repartitioner.createFetchURL(this, true);
- }
-
- /**
- * Get the pull server URIs without repeated parameters.
- */
- public List<URI> getSimpleURIs(){
- return Repartitioner.createFetchURL(this, false);
- }
-
public String getName() {
return name;
}
- public void setName(String name) {
- this.name = name;
- }
-
- public List<Integer> getTaskIds() {
- return taskIds;
- }
-
- public List<Integer> getAttemptIds() {
- return attemptIds;
- }
-
public long getOffset() {
return offset;
}
@@ -238,8 +243,7 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
newFetchImpl.type = type;
newFetchImpl.executionBlockId = executionBlockId;
newFetchImpl.partitionId = partitionId;
- newFetchImpl.name = name;
- newFetchImpl.rangeParams = rangeParams;
+ newFetchImpl.rangeParam = rangeParam;
newFetchImpl.hasNext = hasNext;
if (taskIds != null) {
newFetchImpl.taskIds = Lists.newArrayList(taskIds);
@@ -269,7 +273,7 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
TUtil.checkEquals(executionBlockId, fetch.executionBlockId) &&
TUtil.checkEquals(host, fetch.host) &&
TUtil.checkEquals(name, fetch.name) &&
- TUtil.checkEquals(rangeParams, fetch.rangeParams) &&
+ TUtil.checkEquals(rangeParam, fetch.rangeParam) &&
TUtil.checkEquals(taskIds, fetch.taskIds) &&
TUtil.checkEquals(type, fetch.type) &&
TUtil.checkEquals(offset, fetch.offset) &&
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index b5abffe..250b4cc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -31,7 +31,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.pullserver.retriever.FileChunk;
import org.apache.tajo.rpc.NettyUtils;
@@ -42,6 +44,8 @@ import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.TimeUnit;
/**
@@ -67,6 +71,7 @@ public class Fetcher {
private TajoProtos.FetcherState state;
private Bootstrap bootstrap;
+ private List<Long> chunkLengths = new ArrayList<>();
public Fetcher(TajoConf conf, URI uri, FileChunk chunk) {
this.uri = uri;
@@ -97,9 +102,6 @@ public class Fetcher {
conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000)
.option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
.option(ChannelOption.TCP_NODELAY, true);
-
- ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer(fileChunk.getFile());
- bootstrap.handler(initializer);
}
}
@@ -123,12 +125,20 @@ public class Fetcher {
return messageReceiveCount;
}
- public FileChunk get() throws IOException {
+ public List<FileChunk> get() throws IOException {
+ List<FileChunk> fileChunks = new ArrayList<>();
if (useLocalFile) {
startTime = System.currentTimeMillis();
finishTime = System.currentTimeMillis();
state = TajoProtos.FetcherState.FETCH_FINISHED;
- return fileChunk;
+ fileChunks.add(fileChunk);
+ fileLen = fileChunk.getFile().length();
+ return fileChunks;
+ }
+
+ if (state == FetcherState.FETCH_INIT) {
+ ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer(fileChunk.getFile());
+ bootstrap.handler(initializer);
}
this.startTime = System.currentTimeMillis();
@@ -136,7 +146,7 @@ public class Fetcher {
ChannelFuture future = null;
try {
future = bootstrap.clone().connect(new InetSocketAddress(host, port))
- .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
+ .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
// Wait until the connection attempt succeeds or fails.
Channel channel = future.awaitUninterruptibly().channel();
@@ -154,7 +164,7 @@ public class Fetcher {
request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
if(LOG.isDebugEnabled()) {
- LOG.info("Status: " + getState() + ", URI:" + uri);
+ LOG.debug("Status: " + getState() + ", URI:" + uri);
}
// Send the HTTP request.
channel.writeAndFlush(request);
@@ -163,7 +173,18 @@ public class Fetcher {
channel.closeFuture().syncUninterruptibly();
fileChunk.setLength(fileChunk.getFile().length());
- return fileChunk;
+
+ long start = 0;
+ for (Long eachChunkLength : chunkLengths) {
+ if (eachChunkLength == 0) continue;
+ FileChunk chunk = new FileChunk(fileChunk.getFile(), start, eachChunkLength);
+ chunk.setEbId(fileChunk.getEbId());
+ chunk.setFromRemote(fileChunk.fromRemote());
+ fileChunks.add(chunk);
+ start += eachChunkLength;
+ }
+ return fileChunks;
+
} finally {
if(future != null && future.channel().isOpen()){
// Close the channel to exit.
@@ -226,6 +247,13 @@ public class Fetcher {
}
}
}
+ if (response.headers().contains(TajoPullServerService.CHUNK_LENGTH_HEADER_NAME)) {
+ String stringOffset = response.headers().get(TajoPullServerService.CHUNK_LENGTH_HEADER_NAME);
+
+ for (String eachSplit : stringOffset.split(",")) {
+ chunkLengths.add(Long.parseLong(eachSplit));
+ }
+ }
}
if (LOG.isDebugEnabled()) {
LOG.debug(sb.toString());
@@ -296,6 +324,7 @@ public class Fetcher {
if(getState() != TajoProtos.FetcherState.FETCH_FINISHED){
//channel is closed, but cannot complete fetcher
finishTime = System.currentTimeMillis();
+ LOG.error("Channel closed by peer: " + ctx.channel());
state = TajoProtos.FetcherState.FETCH_FAILED;
}
IOUtils.cleanup(LOG, fc, raf);