You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2016/04/27 06:27:11 UTC
[4/4] tajo git commit: TAJO-2122: PullServer as an Auxiliary service
of Yarn.
TAJO-2122: PullServer as an Auxiliary service of Yarn.
Closes #1001
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/73ac4b87
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/73ac4b87
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/73ac4b87
Branch: refs/heads/master
Commit: 73ac4b87d0f7389b79be8a847e4215fc59befaff
Parents: 71193b2
Author: Jihoon Son <ji...@apache.org>
Authored: Wed Apr 27 13:26:28 2016 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Wed Apr 27 13:26:28 2016 +0900
----------------------------------------------------------------------
CHANGES | 2 +
pom.xml | 3 +-
.../org/apache/tajo/TajoTestingCluster.java | 2 +-
.../java/org/apache/tajo/conf/TajoConf.java | 1 +
.../org/apache/tajo/exception/ErrorUtil.java | 14 +-
tajo-common/src/main/proto/tajo_protos.proto | 8 +-
tajo-core-tests/pom.xml | 4 +
.../physical/TestProgressExternalSortExec.java | 2 -
.../apache/tajo/master/TestRepartitioner.java | 10 +-
.../apache/tajo/querymaster/TestKillQuery.java | 3 +-
.../apache/tajo/worker/MockExecutionBlock.java | 42 --
.../tajo/worker/MockExecutionBlockContext.java | 42 ++
.../apache/tajo/worker/MockTaskExecutor.java | 2 +-
.../org/apache/tajo/worker/MockTaskManager.java | 3 +-
.../org/apache/tajo/worker/TestFetcher.java | 236 -------
.../worker/TestFetcherWithTajoPullServer.java | 437 ++++++++++++
.../apache/tajo/worker/TestTaskExecutor.java | 2 +-
.../apache/tajo/querymaster/Repartitioner.java | 96 +--
.../org/apache/tajo/worker/AbstractFetcher.java | 89 +++
.../tajo/worker/ExecutionBlockContext.java | 10 +-
.../worker/ExecutionBlockSharedResource.java | 16 +
.../java/org/apache/tajo/worker/Fetcher.java | 356 ----------
.../org/apache/tajo/worker/LocalFetcher.java | 480 +++++++++++++
.../org/apache/tajo/worker/RemoteFetcher.java | 317 +++++++++
.../java/org/apache/tajo/worker/TajoWorker.java | 18 +-
.../main/java/org/apache/tajo/worker/Task.java | 2 +-
.../java/org/apache/tajo/worker/TaskImpl.java | 156 +----
.../org/apache/tajo/worker/TaskManager.java | 19 +-
.../resources/webapps/worker/taskdetail.jsp | 4 +-
tajo-dist/pom.xml | 3 +
tajo-docs/src/main/sphinx/configuration.rst | 1 +
.../main/sphinx/configuration/cluster_setup.rst | 18 +-
.../configuration/pullserver_configuration.rst | 75 ++
tajo-project/pom.xml | 12 +-
tajo-pullserver/pom.xml | 6 +-
.../tajo/pullserver/FadvisedFileRegion.java | 1 +
.../tajo/pullserver/FileCloseListener.java | 14 +-
.../tajo/pullserver/PullServerConstants.java | 93 +++
.../apache/tajo/pullserver/PullServerUtil.java | 688 ++++++++++++++++++-
.../apache/tajo/pullserver/TajoPullServer.java | 5 -
.../tajo/pullserver/TajoPullServerService.java | 683 +++++-------------
.../tajo/pullserver/retriever/FileChunk.java | 4 +-
.../pullserver/retriever/FileChunkMeta.java | 53 ++
.../pullserver/retriever/IndexCacheKey.java | 63 ++
tajo-yarn/pom.xml | 265 +++++++
.../apache/tajo/yarn/FadvisedChunkedFile.java | 82 +++
.../apache/tajo/yarn/FadvisedFileRegion.java | 173 +++++
.../org/apache/tajo/yarn/FileCloseListener.java | 41 ++
.../apache/tajo/yarn/TajoPullServerService.java | 608 ++++++++++++++++
49 files changed, 3794 insertions(+), 1470 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 38aedda..b729cce 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,8 @@ Release 0.12.0 - unreleased
NEW FEATURES
+ TAJO-2122: PullServer as an Auxiliary service of Yarn. (jihoon)
+
TAJO-2109: Implement Radix sort. (jihoon)
TAJO-1955: Add a feature to strip quotes from CSV file. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d255652..71c062b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,12 +92,13 @@
<module>tajo-sql-parser</module>
<module>tajo-storage</module>
<module>tajo-pullserver</module>
- <module>tajo-dist</module>
+ <module>tajo-yarn</module>
<module>tajo-thirdparty/asm</module>
<module>tajo-cli</module>
<module>tajo-metrics</module>
<module>tajo-core-tests</module>
<module>tajo-cluster-tests</module>
+ <module>tajo-dist</module>
</modules>
<build>
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 4e7d236..b1a3306 100644
--- a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -162,7 +162,7 @@ public class TajoTestingCluster {
conf.setInt(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE.varname, 1);
conf.setInt(ConfVars.$EXECUTOR_HASH_SHUFFLE_BUFFER_SIZE.varname, 1);
- /** decrease Hbase thread and memory cache for testing */
+ /* decrease Hbase thread and memory cache for testing */
//server handler
conf.setInt("hbase.regionserver.handler.count", 5);
//client handler
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 2e2fb18..440af80 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -214,6 +214,7 @@ public class TajoConf extends Configuration {
PULLSERVER_CACHE_TIMEOUT("tajo.pullserver.index-cache.timeout-min", 5, Validators.min("1")),
PULLSERVER_FETCH_URL_MAX_LENGTH("tajo.pullserver.fetch-url.max-length", StorageUnit.KB,
Validators.min("1")),
+ YARN_SHUFFLE_SERVICE_ENABLED("tajo.shuffle.yarn-service.enabled", false, Validators.bool()),
SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false, Validators.bool()),
SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", BuiltinStorages.RAW, Validators.javaString()),
SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num",
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java
index 9a71bd6..957b3d1 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java
@@ -34,12 +34,14 @@ public class ErrorUtil {
public static Stacktrace.StackTrace convertStacktrace(Throwable t) {
Stacktrace.StackTrace.Builder builder = Stacktrace.StackTrace.newBuilder();
- for (StackTraceElement element : t.getStackTrace()) {
- builder.addElement(Stacktrace.StackTrace.Element.newBuilder()
- .setFilename(element.getFileName() == null ? "(Unknown Source)" : element.getFileName())
- .setFunction(element.getClassName() + "::" + element.getMethodName())
- .setLine(element.getLineNumber())
- );
+ if (t != null) {
+ for (StackTraceElement element : t.getStackTrace()) {
+ builder.addElement(Stacktrace.StackTrace.Element.newBuilder()
+ .setFilename(element.getFileName() == null ? "(Unknown Source)" : element.getFileName())
+ .setFunction(element.getClassName() + "::" + element.getMethodName())
+ .setLine(element.getLineNumber())
+ );
+ }
}
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-common/src/main/proto/tajo_protos.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/tajo_protos.proto b/tajo-common/src/main/proto/tajo_protos.proto
index 1795107..78d48c4 100644
--- a/tajo-common/src/main/proto/tajo_protos.proto
+++ b/tajo-common/src/main/proto/tajo_protos.proto
@@ -49,9 +49,11 @@ enum TaskAttemptState {
enum FetcherState {
FETCH_INIT = 0;
- FETCH_FETCHING = 1;
- FETCH_FINISHED = 2;
- FETCH_FAILED = 3;
+ FETCH_META_FETCHING = 1;
+ FETCH_META_FINISHED = 2;
+ FETCH_DATA_FETCHING = 3;
+ FETCH_DATA_FINISHED = 4;
+ FETCH_FAILED = 5;
}
message WorkerConnectionInfoProto {
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core-tests/pom.xml b/tajo-core-tests/pom.xml
index b12642a..3554df4 100644
--- a/tajo-core-tests/pom.xml
+++ b/tajo-core-tests/pom.xml
@@ -174,6 +174,10 @@
</dependency>
<dependency>
<groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-yarn</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
<artifactId>tajo-rpc-protobuf</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
index eeb179f..51cd5ea 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
@@ -173,13 +173,11 @@ public class TestProgressExternalSortExec {
while ((tuple = exec.next()) != null) {
if (cnt == 0) {
initProgress = exec.getProgress();
- System.out.println(initProgress);
assertTrue(initProgress > 0.5f && initProgress < 1.0f);
}
if (cnt == testDataStats.getNumRows() / 2) {
float progress = exec.getProgress();
- System.out.println(progress);
assertTrue(progress > initProgress);
}
curVal = tuple;
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index a13a750..abec6a0 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -20,12 +20,13 @@ package org.apache.tajo.master;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import io.netty.handler.codec.http.QueryStringDecoder;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.QueryId;
import org.apache.tajo.ResourceProtos.FetchProto;
import org.apache.tajo.TestTajoIds;
+import org.apache.tajo.pullserver.PullServerConstants;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerParams;
import org.apache.tajo.querymaster.Repartitioner;
import org.apache.tajo.querymaster.Task;
import org.apache.tajo.querymaster.Task.IntermediateEntry;
@@ -88,12 +89,11 @@ public class TestRepartitioner {
assertEquals(1, uris.size()); //In Hash Suffle, Fetcher return only one URI per partition.
URI uri = uris.get(0);
- final Map<String, List<String>> params =
- new QueryStringDecoder(uri).parameters();
+ final PullServerParams params = new PullServerParams(uri);
assertEquals(eachEntry.getKey().toString(), params.get("p").get(0));
- assertEquals("h", params.get("type").get(0));
- assertEquals("" + sid.getId(), params.get("sid").get(0));
+ assertEquals(PullServerConstants.HASH_SHUFFLE_PARAM_STRING, params.shuffleType());
+ assertEquals("" + sid.getId(), params.ebId());
}
Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> mergedHashEntries =
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index ac5efd9..8d33dbc 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -26,7 +26,6 @@ import org.apache.tajo.*;
import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.CatalogService;
-import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.parser.sql.SQLAnalyzer;
@@ -269,7 +268,7 @@ public class TestKillQuery {
}
};
- ExecutionBlockContext context = new MockExecutionBlock(workerContext, requestProtoBuilder.build()) {
+ ExecutionBlockContext context = new MockExecutionBlockContext(workerContext, requestProtoBuilder.build()) {
@Override
public Path createBaseDir() throws IOException {
return new Path("test");
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
deleted file mode 100644
index cbc4312..0000000
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
-import org.apache.tajo.TaskAttemptId;
-
-import java.io.IOException;
-
-public class MockExecutionBlock extends ExecutionBlockContext {
-
- public MockExecutionBlock(TajoWorker.WorkerContext workerContext,
- ExecutionBlockContextResponse request) throws IOException {
- super(workerContext, request, null);
- }
-
- @Override
- public void init() throws Throwable {
- //skip
- }
-
- @Override
- public void fatalError(TaskAttemptId taskAttemptId, Throwable throwable) {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlockContext.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlockContext.java
new file mode 100644
index 0000000..b64ab9b
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlockContext.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
+import org.apache.tajo.TaskAttemptId;
+
+import java.io.IOException;
+
+public class MockExecutionBlockContext extends ExecutionBlockContext {
+
+ public MockExecutionBlockContext(TajoWorker.WorkerContext workerContext,
+ ExecutionBlockContextResponse request) throws IOException {
+ super(workerContext, request, null, null);
+ }
+
+ @Override
+ public void init() throws Throwable {
+ //skip
+ }
+
+ @Override
+ public void fatalError(TaskAttemptId taskAttemptId, Throwable throwable) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
index 071d26a..ea609b1 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
@@ -145,7 +145,7 @@ public class MockTaskExecutor extends TaskExecutor {
}
@Override
- public List<Fetcher> getFetchers() {
+ public List<AbstractFetcher> getFetchers() {
return null;
}
};
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
index 5979bbb..0e114bb 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
@@ -29,7 +29,6 @@ import org.apache.tajo.worker.event.TaskManagerEvent;
import java.io.IOException;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeoutException;
public class MockTaskManager extends TaskManager {
@@ -61,7 +60,7 @@ public class MockTaskManager extends TaskManager {
.setQueryContext(new QueryContext(new TajoConf()).getProto())
.setQueryOutputPath("testpath")
.setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
- return new MockExecutionBlock(getWorkerContext(), builder.build());
+ return new MockExecutionBlockContext(getWorkerContext(), builder.build());
} catch (IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
deleted file mode 100644
index dfc37b0..0000000
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import org.apache.hadoop.fs.*;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Random;
-
-import static org.junit.Assert.*;
-
-public class TestFetcher {
- private String TEST_DATA = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestFetcher";
- private String INPUT_DIR = TEST_DATA+"/in/";
- private String OUTPUT_DIR = TEST_DATA+"/out/";
- private TajoConf conf = new TajoConf();
- private TajoPullServerService pullServerService;
-
- @Before
- public void setUp() throws Exception {
- CommonTestingUtil.getTestDir(TEST_DATA);
- CommonTestingUtil.getTestDir(INPUT_DIR);
- CommonTestingUtil.getTestDir(OUTPUT_DIR);
- conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, INPUT_DIR);
- conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT, 1);
- conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE, 127);
-
- pullServerService = new TajoPullServerService();
- pullServerService.init(conf);
- pullServerService.start();
- }
-
- @After
- public void tearDown(){
- pullServerService.stop();
- }
-
- @Test
- public void testGet() throws IOException {
- Random rnd = new Random();
- QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
- String sid = "1";
- String partId = "1";
-
- int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
- String dataPath = conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) +
- queryId.toString() + "/output/" + sid + "/hash-shuffle/" + partParentId + "/" + partId;
-
- String params = String.format("qid=%s&sid=%s&p=%s&type=%s", queryId, sid, partId, "h");
-
- Path inputPath = new Path(dataPath);
- FSDataOutputStream stream = FileSystem.getLocal(conf).create(inputPath, true);
- for (int i = 0; i < 100; i++) {
- String data = ""+rnd.nextInt();
- stream.write(data.getBytes());
- }
- stream.flush();
- stream.close();
-
- URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
- FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
- storeChunk.setFromRemote(true);
- final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
- FileChunk chunk = fetcher.get().get(0);
- assertNotNull(chunk);
- assertNotNull(chunk.getFile());
-
- FileSystem fs = FileSystem.getLocal(new TajoConf());
- FileStatus inStatus = fs.getFileStatus(inputPath);
- FileStatus outStatus = fs.getFileStatus(new Path(OUTPUT_DIR, "data"));
-
- assertEquals(inStatus.getLen(), outStatus.getLen());
- assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
- }
-
- @Test
- public void testAdjustFetchProcess() {
- assertEquals(0.0f, TaskImpl.adjustFetchProcess(0, 0), 0);
- assertEquals(0.0f, TaskImpl.adjustFetchProcess(10, 10), 0);
- assertEquals(0.05f, TaskImpl.adjustFetchProcess(10, 9), 0);
- assertEquals(0.1f, TaskImpl.adjustFetchProcess(10, 8), 0);
- assertEquals(0.25f, TaskImpl.adjustFetchProcess(10, 5), 0);
- assertEquals(0.45f, TaskImpl.adjustFetchProcess(10, 1), 0);
- assertEquals(0.5f, TaskImpl.adjustFetchProcess(10, 0), 0);
- }
-
- @Test
- public void testStatus() throws Exception {
- Random rnd = new Random();
- QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
- String sid = "1";
- String ta = "1_0";
- String partId = "1";
-
- String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
- String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
-
- FSDataOutputStream stream = FileSystem.getLocal(conf).create(new Path(dataPath), true);
- for (int i = 0; i < 100; i++) {
- String data = ""+rnd.nextInt();
- stream.write(data.getBytes());
- }
- stream.flush();
- stream.close();
-
- URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
- FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
- storeChunk.setFromRemote(true);
- final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
- assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
-
- fetcher.get();
- assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
- }
-
- @Test
- public void testNoContentFetch() throws Exception {
-
- QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
- String sid = "1";
- String ta = "1_0";
- String partId = "1";
-
- String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
- String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
-
- Path inputPath = new Path(dataPath);
- FileSystem fs = FileSystem.getLocal(conf);
- if(fs.exists(inputPath)){
- fs.delete(new Path(dataPath), true);
- }
-
- FSDataOutputStream stream = FileSystem.getLocal(conf).create(new Path(dataPath).getParent(), true);
- stream.close();
-
- URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
- FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
- storeChunk.setFromRemote(true);
- final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
- assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
-
- fetcher.get();
- assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
- }
-
- @Test
- public void testFailureStatus() throws Exception {
- Random rnd = new Random();
-
- QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
- String sid = "1";
- String ta = "1_0";
- String partId = "1";
-
- String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
-
- //TajoPullServerService will be throws BAD_REQUEST by Unknown shuffle type
- String shuffleType = "x";
- String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, shuffleType, ta);
-
- FSDataOutputStream stream = FileSystem.getLocal(conf).create(new Path(dataPath), true);
-
- for (int i = 0; i < 100; i++) {
- String data = params + rnd.nextInt();
- stream.write(data.getBytes());
- }
- stream.flush();
- stream.close();
-
- URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
- FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
- storeChunk.setFromRemote(true);
- final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
- assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
-
- fetcher.get();
- assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
- }
-
- @Test
- public void testServerFailure() throws Exception {
- QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
- String sid = "1";
- String ta = "1_0";
- String partId = "1";
-
- String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
- String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
-
- URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
- FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
- storeChunk.setFromRemote(true);
- final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
- assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
-
- pullServerService.stop();
-
- boolean failure = false;
- try{
- fetcher.get();
- } catch (Throwable e){
- failure = true;
- }
- assertTrue(failure);
- assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcherWithTajoPullServer.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcherWithTajoPullServer.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcherWithTajoPullServer.java
new file mode 100644
index 0000000..8844fce
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcherWithTajoPullServer.java
@@ -0,0 +1,437 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.pullserver.PullServerConstants;
+import org.apache.tajo.pullserver.PullServerUtil;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerRequestURIBuilder;
+import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.FetchImpl.RangeParam;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.*;
+
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+public class TestFetcherWithTajoPullServer {
+ private enum FetchType {
+ LOCAL,
+ REMOTE
+ }
+ private enum PullServerType {
+ TAJO,
+ YARN
+ }
+
+ private final String TEST_DATA = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/" +
+ TestFetcherWithTajoPullServer.class.getSimpleName();
+ private final String INPUT_DIR = TEST_DATA+"/in/";
+ private final String OUTPUT_DIR = TEST_DATA+"/out/";
+ private final TajoConf conf = new TajoConf();
+ private Service pullServerService;
+ private final int maxUrlLength = conf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
+ private final String TEST_TABLE_NAME = "test";
+ private final FetchType fetchType;
+ private final PullServerType pullServerType;
+ private int pullserverPort;
+
+ public TestFetcherWithTajoPullServer(FetchType fetchType, PullServerType pullServerType) {
+ this.fetchType = fetchType;
+ this.pullServerType = pullServerType;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ CommonTestingUtil.getTestDir(TEST_DATA);
+ CommonTestingUtil.getTestDir(INPUT_DIR);
+ CommonTestingUtil.getTestDir(OUTPUT_DIR);
+ conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, INPUT_DIR);
+ conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT, 1);
+ conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE, 127);
+
+ if (pullServerType.equals(PullServerType.TAJO)) {
+ pullServerService = new TajoPullServerService();
+ } else {
+ pullServerService = new org.apache.tajo.yarn.TajoPullServerService();
+ }
+ pullServerService.init(conf);
+ pullServerService.start();
+
+ if (pullServerType.equals(PullServerType.TAJO)) {
+ pullserverPort = ((TajoPullServerService)pullServerService).getPort();
+ } else {
+ pullserverPort = ((org.apache.tajo.yarn.TajoPullServerService)pullServerService).getPort();
+ }
+ }
+
+ @After
+ public void tearDown() {
+ pullServerService.stop();
+ }
+
+ @Parameters(name = "{index}: {0}, {1}")
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][] {
+ {FetchType.LOCAL, PullServerType.TAJO},
+ {FetchType.REMOTE, PullServerType.TAJO},
+ {FetchType.LOCAL, PullServerType.YARN},
+ {FetchType.REMOTE, PullServerType.YARN}
+ });
+ }
+
+ private AbstractFetcher getFetcher(URI uri, File data) throws IOException {
+ if (fetchType.equals(FetchType.LOCAL)) {
+ return new LocalFetcher(conf, uri, TEST_TABLE_NAME);
+ } else {
+ FileChunk storeChunk = new FileChunk(data, 0, data.length());
+ storeChunk.setFromRemote(true);
+ return new RemoteFetcher(conf, uri, storeChunk);
+ }
+ }
+
+ @Test
+ public void testGetHashShuffle() throws IOException {
+ Random rnd = new Random();
+ QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+ String sid = "1";
+ String partId = "1";
+
+ Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId.toString(), sid);
+ final int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+ final Path dataPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+
+ PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+ maxUrlLength);
+ builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+ .setQueryId(queryId.toString())
+ .setEbId(sid)
+ .setPartId(partId)
+ .setShuffleType(PullServerConstants.HASH_SHUFFLE_PARAM_STRING);
+
+ Path inputPath = new Path(INPUT_DIR, dataPath);
+ FSDataOutputStream stream = FileSystem.getLocal(conf).create(inputPath, true);
+ for (int i = 0; i < 100; i++) {
+ String data = ""+rnd.nextInt();
+ stream.write(data.getBytes());
+ }
+ stream.flush();
+ stream.close();
+
+ URI uri = builder.build(false).get(0);
+ File data = new File(OUTPUT_DIR + "data");
+
+ final AbstractFetcher fetcher = getFetcher(uri, data);
+
+ FileChunk chunk = fetcher.get().get(0);
+ assertNotNull(chunk);
+ assertNotNull(chunk.getFile());
+
+ FileSystem fs = FileSystem.getLocal(new TajoConf());
+ FileStatus inStatus = fs.getFileStatus(inputPath);
+ FileStatus outStatus = fs.getFileStatus(new Path(chunk.getFile().getAbsolutePath()));
+
+ assertEquals(inStatus.getLen(), outStatus.getLen());
+ assertEquals(FetcherState.FETCH_DATA_FINISHED, fetcher.getState());
+ }
+
+ @Test
+ public void testGetRangeShuffle() throws IOException {
+ Random rnd = new Random();
+ QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+ String sid = "1";
+ String partId = "1";
+ String taskId = "1";
+ String attemptId = "0";
+
+ Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId.toString(), sid);
+ Path outDir = StorageUtil.concatPath(queryBaseDir, taskId + "_" + attemptId, "output");
+ Path dataPath = StorageUtil.concatPath(outDir, "output");
+ Path indexPath = StorageUtil.concatPath(outDir, "index");
+
+ List<String> strings = new ArrayList<>(100);
+ for (int i = 0; i < 100; i++) {
+ strings.add("" + rnd.nextInt());
+ }
+ Collections.sort(strings);
+
+ Path inputPath = new Path(INPUT_DIR, dataPath);
+ FileSystem fs = FileSystem.getLocal(conf);
+ if (fs.exists(outDir)) {
+ fs.delete(outDir, true);
+ }
+ final FSDataOutputStream stream = fs.create(inputPath, true);
+ BSTIndex index = new BSTIndex(conf);
+ Schema schema = SchemaBuilder.builder().addAll(new Column[] {new Column("rnd", Type.TEXT)}).build();
+ SortSpec[] sortSpecs = new SortSpec[] {new SortSpec(schema.getColumn(0))};
+ BSTIndexWriter writer = index.getIndexWriter(new Path(INPUT_DIR, indexPath), BSTIndex.TWO_LEVEL_INDEX, schema, new BaseTupleComparator(schema, sortSpecs), true);
+ writer.init();
+
+ for (String t : strings) {
+ writer.write(new VTuple(new Datum[] {DatumFactory.createText(t)}), stream.getPos());
+ stream.write(t.getBytes());
+ }
+ stream.flush();
+ writer.flush();
+ stream.close();
+ writer.close();
+
+ RangeParam rangeParam = new RangeParam(new TupleRange(sortSpecs,
+ new VTuple(new Datum[] {DatumFactory.createText(strings.get(0))}),
+ new VTuple(new Datum[] {DatumFactory.createText(strings.get(strings.size() - 1))})), true, RowStoreUtil.createEncoder(schema));
+ PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+ maxUrlLength);
+ builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+ .setQueryId(queryId.toString())
+ .setEbId(sid)
+ .setPartId(partId)
+ .setShuffleType(PullServerConstants.RANGE_SHUFFLE_PARAM_STRING)
+ .setTaskIds(Lists.newArrayList(Integer.parseInt(taskId)))
+ .setAttemptIds(Lists.newArrayList(Integer.parseInt(attemptId)))
+ .setStartKeyBase64(new String(Base64.encodeBase64(rangeParam.getStart())))
+ .setEndKeyBase64(new String(Base64.encodeBase64(rangeParam.getEnd())))
+ .setLastInclude(true);
+
+ URI uri = builder.build(true).get(0);
+ File data = new File(OUTPUT_DIR + "data");
+
+ final AbstractFetcher fetcher = getFetcher(uri, data);
+
+ FileChunk chunk = fetcher.get().get(0);
+ assertNotNull(chunk);
+ assertNotNull(chunk.getFile());
+
+ FileStatus inStatus = fs.getFileStatus(inputPath);
+ FileStatus outStatus = fs.getFileStatus(new Path(chunk.getFile().getAbsolutePath()));
+
+ assertEquals(inStatus.getLen(), outStatus.getLen());
+ assertEquals(FetcherState.FETCH_DATA_FINISHED, fetcher.getState());
+ }
+
+ @Test
+ public void testAdjustFetchProcess() {
+ Assert.assertEquals(0.0f, TaskImpl.adjustFetchProcess(0, 0), 0);
+ assertEquals(0.0f, TaskImpl.adjustFetchProcess(10, 10), 0);
+ assertEquals(0.05f, TaskImpl.adjustFetchProcess(10, 9), 0);
+ assertEquals(0.1f, TaskImpl.adjustFetchProcess(10, 8), 0);
+ assertEquals(0.25f, TaskImpl.adjustFetchProcess(10, 5), 0);
+ assertEquals(0.45f, TaskImpl.adjustFetchProcess(10, 1), 0);
+ assertEquals(0.5f, TaskImpl.adjustFetchProcess(10, 0), 0);
+ }
+
+ @Test
+ public void testStatus() throws Exception {
+ Random rnd = new Random();
+ QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+ String sid = "1";
+ String ta = "1_0";
+ String partId = "1";
+
+ Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId.toString(), sid);
+ final int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+ final Path dataPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+
+ PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+ maxUrlLength);
+ builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+ .setQueryId(queryId.toString())
+ .setEbId(sid)
+ .setPartId(partId)
+ .setShuffleType(PullServerConstants.HASH_SHUFFLE_PARAM_STRING)
+ .setTaskAttemptIds(Lists.newArrayList(ta));
+
+ FSDataOutputStream stream = FileSystem.getLocal(conf).create(new Path(INPUT_DIR, dataPath), true);
+ for (int i = 0; i < 100; i++) {
+ String data = ""+rnd.nextInt();
+ stream.write(data.getBytes());
+ }
+ stream.flush();
+ stream.close();
+
+ URI uri = builder.build(true).get(0);
+ File data = new File(OUTPUT_DIR + "data");
+ final AbstractFetcher fetcher = getFetcher(uri, data);
+ assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+ fetcher.get();
+ assertEquals(FetcherState.FETCH_DATA_FINISHED, fetcher.getState());
+ }
+
+ @Test
+ public void testNoContentFetch() throws Exception {
+
+ QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+ String sid = "1";
+ String ta = "1_0";
+ String partId = "1";
+
+ Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId.toString(), sid);
+ final int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+ final Path dataPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+
+ PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+ maxUrlLength);
+ builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+ .setQueryId(queryId.toString())
+ .setEbId(sid)
+ .setPartId(partId)
+ .setShuffleType(PullServerConstants.HASH_SHUFFLE_PARAM_STRING)
+ .setTaskAttemptIds(Lists.newArrayList(ta));
+
+ Path inputPath = new Path(INPUT_DIR, dataPath);
+ FileSystem fs = FileSystem.getLocal(conf);
+ if(fs.exists(inputPath)){
+ fs.delete(inputPath, true);
+ }
+
+ FSDataOutputStream stream = fs.create(inputPath, true);
+ stream.close();
+
+ URI uri = builder.build(true).get(0);
+ File data = new File(OUTPUT_DIR + "data");
+ final AbstractFetcher fetcher = getFetcher(uri, data);
+ assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+ try {
+ fetcher.get();
+ if (fetchType.equals(FetchType.LOCAL)) {
+ fail();
+ }
+ } catch (IOException e) {
+ if (fetchType.equals(FetchType.REMOTE)) {
+ fail();
+ }
+ }
+ assertEquals(FetcherState.FETCH_FAILED, fetcher.getState());
+ }
+
+ @Test
+ public void testFailureStatus() throws Exception {
+ Random rnd = new Random();
+
+ QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+ String sid = "1";
+ String ta = "1_0";
+ String partId = "1";
+
+ Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId.toString(), sid);
+ final int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+ final Path dataPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+
+ PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+ maxUrlLength);
+ builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+ .setQueryId(queryId.toString())
+ .setEbId(sid)
+ .setPartId(partId)
+ .setShuffleType("x") //TajoPullServerService will be throws BAD_REQUEST by Unknown shuffle type
+ .setTaskAttemptIds(Lists.newArrayList(ta));
+
+ FSDataOutputStream stream = FileSystem.getLocal(conf).create(new Path(INPUT_DIR, dataPath), true);
+
+ for (int i = 0; i < 100; i++) {
+ String data = "" + rnd.nextInt();
+ stream.write(data.getBytes());
+ }
+ stream.flush();
+ stream.close();
+
+ URI uri = builder.build(true).get(0);
+ File data = new File(OUTPUT_DIR + "data");
+ final AbstractFetcher fetcher = getFetcher(uri, data);
+ assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+ try {
+ fetcher.get();
+ if (fetchType.equals(FetchType.LOCAL)) {
+ fail();
+ }
+ } catch (IllegalArgumentException e) {
+ if (!fetchType.equals(FetchType.LOCAL)) {
+ fail();
+ }
+ }
+ assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
+ }
+
+ @Test
+ public void testServerFailure() throws Exception {
+ QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+ String sid = "1";
+ String ta = "1_0";
+ String partId = "1";
+
+ PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+ maxUrlLength);
+ builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+ .setQueryId(queryId.toString())
+ .setEbId(sid)
+ .setPartId(partId)
+ .setShuffleType(PullServerConstants.HASH_SHUFFLE_PARAM_STRING)
+ .setTaskAttemptIds(Lists.newArrayList(ta));
+
+ URI uri = builder.build(true).get(0);
+ File data = new File(OUTPUT_DIR + "data");
+ final AbstractFetcher fetcher = getFetcher(uri, data);
+ assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+ pullServerService.stop();
+
+ boolean failure = false;
+ try{
+ fetcher.get();
+ } catch (IOException e){
+ failure = true;
+ }
+ assertTrue(failure);
+ assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
index 45e430e..df5b3c8 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
@@ -305,7 +305,7 @@ public class TestTaskExecutor {
}
@Override
- public List<Fetcher> getFetchers() {
+ public List<AbstractFetcher> getFetchers() {
return null;
}
};
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index 2a688e5..ba051a3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -50,6 +50,8 @@ import org.apache.tajo.plan.logical.SortNode.SortPurpose;
import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.pullserver.PullServerConstants;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerRequestURIBuilder;
import org.apache.tajo.querymaster.Task.IntermediateEntry;
import org.apache.tajo.querymaster.Task.PullHost;
import org.apache.tajo.storage.*;
@@ -70,7 +72,6 @@ import java.net.URLEncoder;
import java.util.*;
import java.util.Map.Entry;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.*;
@@ -1124,89 +1125,32 @@ public class Repartitioner {
}
public static List<URI> createFetchURL(int maxUrlLength, FetchProto fetch, boolean includeParts) {
- String scheme = "http://";
-
- StringBuilder urlPrefix = new StringBuilder(scheme);
+ PullServerRequestURIBuilder builder =
+ new PullServerRequestURIBuilder(fetch.getHost(), fetch.getPort(), maxUrlLength);
ExecutionBlockId ebId = new ExecutionBlockId(fetch.getExecutionBlockId());
- urlPrefix.append(fetch.getHost()).append(":").append(fetch.getPort()).append("/?")
- .append("qid=").append(ebId.getQueryId().toString())
- .append("&sid=").append(ebId.getId())
- .append("&p=").append(fetch.getPartitionId())
- .append("&type=");
+ builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+ .setQueryId(ebId.getQueryId().toString())
+ .setEbId(ebId.getId())
+ .setPartId(fetch.getPartitionId());
+
if (fetch.getType() == HASH_SHUFFLE) {
- urlPrefix.append("h");
+ builder.setShuffleType(PullServerConstants.HASH_SHUFFLE_PARAM_STRING);
} else if (fetch.getType() == RANGE_SHUFFLE) {
- urlPrefix.append("r").append("&").append(getRangeParam(fetch));
+ builder.setShuffleType(PullServerConstants.RANGE_SHUFFLE_PARAM_STRING);
+ builder.setStartKeyBase64(new String(org.apache.commons.codec.binary.Base64.encodeBase64(fetch.getRangeStart().toByteArray())));
+ builder.setEndKeyBase64(new String(org.apache.commons.codec.binary.Base64.encodeBase64(fetch.getRangeEnd().toByteArray())));
+ builder.setLastInclude(fetch.getRangeLastInclusive());
} else if (fetch.getType() == SCATTERED_HASH_SHUFFLE) {
- urlPrefix.append("s");
+ builder.setShuffleType(PullServerConstants.SCATTERED_HASH_SHUFFLE_PARAM_STRING);
}
-
if (fetch.getLength() >= 0) {
- urlPrefix.append("&offset=").append(fetch.getOffset()).append("&length=").append(fetch.getLength());
+ builder.setOffset(fetch.getOffset()).setLength(fetch.getLength());
}
-
- List<URI> fetchURLs = new ArrayList<>();
- if(includeParts) {
- if (fetch.getType() == HASH_SHUFFLE || fetch.getType() == SCATTERED_HASH_SHUFFLE) {
- fetchURLs.add(URI.create(urlPrefix.toString()));
- } else {
- urlPrefix.append("&ta=");
- // If the get request is longer than 2000 characters,
- // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
- // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
- // The below code transforms a long request to multiple requests.
- List<String> taskIdsParams = new ArrayList<>();
- StringBuilder taskIdListBuilder = new StringBuilder();
-
- final List<Integer> taskIds = fetch.getTaskIdList();
- final List<Integer> attemptIds = fetch.getAttemptIdList();
-
- // Sort task ids to increase cache hit in pull server
- final List<Pair<Integer, Integer>> taskAndAttemptIds = IntStream.range(0, taskIds.size())
- .mapToObj(i -> new Pair<>(taskIds.get(i), attemptIds.get(i)))
- .sorted((p1, p2) -> p1.getFirst() - p2.getFirst())
- .collect(Collectors.toList());
-
- boolean first = true;
-
- for (int i = 0; i < taskAndAttemptIds.size(); i++) {
- StringBuilder taskAttemptId = new StringBuilder();
-
- if (!first) { // when comma is added?
- taskAttemptId.append(",");
- } else {
- first = false;
- }
-
- int taskId = taskAndAttemptIds.get(i).getFirst();
- if (taskId < 0) {
- // In the case of hash shuffle each partition has single shuffle file per worker.
- // TODO If file is large, consider multiple fetching(shuffle file can be split)
- continue;
- }
- int attemptId = taskAndAttemptIds.get(i).getSecond();
- taskAttemptId.append(taskId).append("_").append(attemptId);
-
- if (urlPrefix.length() + taskIdListBuilder.length() > maxUrlLength) {
- taskIdsParams.add(taskIdListBuilder.toString());
- taskIdListBuilder = new StringBuilder(taskId + "_" + attemptId);
- } else {
- taskIdListBuilder.append(taskAttemptId);
- }
- }
- // if the url params remain
- if (taskIdListBuilder.length() > 0) {
- taskIdsParams.add(taskIdListBuilder.toString());
- }
- for (String param : taskIdsParams) {
- fetchURLs.add(URI.create(urlPrefix + param));
- }
- }
- } else {
- fetchURLs.add(URI.create(urlPrefix.toString()));
+ if (includeParts) {
+ builder.setTaskIds(fetch.getTaskIdList());
+ builder.setAttemptIds(fetch.getAttemptIdList());
}
-
- return fetchURLs;
+ return builder.build(includeParts);
}
public static Map<Integer, List<IntermediateEntry>> hashByKey(List<IntermediateEntry> entries) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractFetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractFetcher.java
new file mode 100644
index 0000000..a12db77
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractFetcher.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+public abstract class AbstractFetcher {
+
+ protected final URI uri;
+ protected FileChunk fileChunk;
+ protected final TajoConf conf;
+
+ protected TajoProtos.FetcherState state;
+
+ protected long startTime;
+ protected volatile long finishTime;
+ protected int fileNum;
+ protected long fileLen;
+ protected int messageReceiveCount;
+
+ public AbstractFetcher(TajoConf conf, URI uri) {
+ this(conf, uri, null);
+ }
+
+ public AbstractFetcher(TajoConf conf, URI uri, FileChunk fileChunk) {
+ this.conf = conf;
+ this.uri = uri;
+ this.fileChunk = fileChunk;
+ this.state = TajoProtos.FetcherState.FETCH_INIT;
+ }
+
+ public URI getURI() {
+ return this.uri;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public long getFileLen() {
+ return fileLen;
+ }
+
+ public int getFileNum() {
+ return fileNum;
+ }
+
+ public TajoProtos.FetcherState getState() {
+ return state;
+ }
+
+ public int getMessageReceiveCount() {
+ return messageReceiveCount;
+ }
+
+ public abstract List<FileChunk> get() throws IOException;
+
+ protected void endFetch(FetcherState state) {
+ this.finishTime = System.currentTimeMillis();
+ this.state = state;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index e675d70..4ab6627 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -37,6 +37,7 @@ import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TaskId;
+import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.ErrorUtil;
@@ -44,6 +45,7 @@ import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.pullserver.PullServerUtil;
import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.rpc.*;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
@@ -103,7 +105,7 @@ public class ExecutionBlockContext {
private final Map<TaskId, TaskHistory> taskHistories = Maps.newConcurrentMap();
public ExecutionBlockContext(TajoWorker.WorkerContext workerContext, ExecutionBlockContextResponse request,
- AsyncRpcClient queryMasterClient)
+ AsyncRpcClient queryMasterClient, @Nullable TajoPullServerService pullServerService)
throws IOException {
this.executionBlockId = new ExecutionBlockId(request.getExecutionBlockId());
this.connManager = RpcClientManager.getInstance();
@@ -117,7 +119,7 @@ public class ExecutionBlockContext {
this.queryEngine = new TajoQueryEngine(systemConf);
this.queryContext = new QueryContext(workerContext.getConf(), request.getQueryContext());
this.plan = request.getPlanJson();
- this.resource = new ExecutionBlockSharedResource();
+ this.resource = new ExecutionBlockSharedResource(pullServerService);
this.workerContext = workerContext;
this.shuffleType = request.getShuffleType();
this.queryMasterClient = queryMasterClient;
@@ -281,12 +283,12 @@ public class ExecutionBlockContext {
}
public static Path getBaseOutputDir(ExecutionBlockId executionBlockId) {
- return TajoPullServerService.getBaseOutputDir(
+ return PullServerUtil.getBaseOutputDir(
executionBlockId.getQueryId().toString(), String.valueOf(executionBlockId.getId()));
}
public static Path getBaseInputDir(ExecutionBlockId executionBlockId) {
- return TajoPullServerService.getBaseInputDir(executionBlockId.getQueryId().toString(), executionBlockId.toString());
+ return PullServerUtil.getBaseInputDir(executionBlockId.getQueryId().toString(), executionBlockId.toString());
}
public ExecutionBlockId getExecutionBlockId() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
index 660f875..e1ff917 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.SessionVars;
+import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.engine.codegen.ExecutorPreCompiler;
import org.apache.tajo.engine.codegen.TajoClassLoader;
@@ -35,8 +36,10 @@ import org.apache.tajo.exception.TajoException;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.util.Pair;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
public class ExecutionBlockSharedResource {
@@ -53,6 +56,15 @@ public class ExecutionBlockSharedResource {
private ExecutorPreCompiler.CompilationContext compilationContext;
private LogicalNode plan;
private boolean codeGenEnabled = false;
+ private final TajoPullServerService pullServerService;
+
+ public ExecutionBlockSharedResource() {
+ this(null);
+ }
+
+ public ExecutionBlockSharedResource(@Nullable TajoPullServerService pullServerService) {
+ this.pullServerService = pullServerService;
+ }
public void initialize(final QueryContext context, final String planJson) {
@@ -130,6 +142,10 @@ public class ExecutionBlockSharedResource {
TableCache.getInstance().releaseCache(id);
}
+ public Optional<TajoPullServerService> getPullServerService() {
+ return pullServerService == null ? Optional.empty() : Optional.of(pullServerService);
+ }
+
public void release() {
compilationContext = null;
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
deleted file mode 100644
index 250b4cc..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.*;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.http.*;
-import io.netty.handler.timeout.ReadTimeoutException;
-import io.netty.handler.timeout.ReadTimeoutHandler;
-import io.netty.util.ReferenceCountUtil;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.TajoProtos.FetcherState;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.rpc.NettyUtils;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Fetcher fetches data from a given uri via HTTP protocol and stores them into
- * a specific file. It aims at asynchronous and efficient data transmit.
- */
-public class Fetcher {
-
- private final static Log LOG = LogFactory.getLog(Fetcher.class);
-
- private final URI uri;
- private final FileChunk fileChunk;
- private final TajoConf conf;
-
- private final String host;
- private int port;
- private final boolean useLocalFile;
-
- private long startTime;
- private volatile long finishTime;
- private long fileLen;
- private int messageReceiveCount;
- private TajoProtos.FetcherState state;
-
- private Bootstrap bootstrap;
- private List<Long> chunkLengths = new ArrayList<>();
-
- public Fetcher(TajoConf conf, URI uri, FileChunk chunk) {
- this.uri = uri;
- this.fileChunk = chunk;
- this.useLocalFile = !chunk.fromRemote();
- this.state = TajoProtos.FetcherState.FETCH_INIT;
- this.conf = conf;
-
- String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
- this.host = uri.getHost() == null ? "localhost" : uri.getHost();
- this.port = uri.getPort();
- if (port == -1) {
- if (scheme.equalsIgnoreCase("http")) {
- this.port = 80;
- } else if (scheme.equalsIgnoreCase("https")) {
- this.port = 443;
- }
- }
-
- if (!useLocalFile) {
- bootstrap = new Bootstrap()
- .group(
- NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER,
- conf.getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
- .channel(NioSocketChannel.class)
- .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
- conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000)
- .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
- .option(ChannelOption.TCP_NODELAY, true);
- }
- }
-
- public long getStartTime() {
- return startTime;
- }
-
- public long getFinishTime() {
- return finishTime;
- }
-
- public long getFileLen() {
- return fileLen;
- }
-
- public TajoProtos.FetcherState getState() {
- return state;
- }
-
- public int getMessageReceiveCount() {
- return messageReceiveCount;
- }
-
- public List<FileChunk> get() throws IOException {
- List<FileChunk> fileChunks = new ArrayList<>();
- if (useLocalFile) {
- startTime = System.currentTimeMillis();
- finishTime = System.currentTimeMillis();
- state = TajoProtos.FetcherState.FETCH_FINISHED;
- fileChunks.add(fileChunk);
- fileLen = fileChunk.getFile().length();
- return fileChunks;
- }
-
- if (state == FetcherState.FETCH_INIT) {
- ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer(fileChunk.getFile());
- bootstrap.handler(initializer);
- }
-
- this.startTime = System.currentTimeMillis();
- this.state = TajoProtos.FetcherState.FETCH_FETCHING;
- ChannelFuture future = null;
- try {
- future = bootstrap.clone().connect(new InetSocketAddress(host, port))
- .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
-
- // Wait until the connection attempt succeeds or fails.
- Channel channel = future.awaitUninterruptibly().channel();
- if (!future.isSuccess()) {
- state = TajoProtos.FetcherState.FETCH_FAILED;
- throw new IOException(future.cause());
- }
-
- String query = uri.getPath()
- + (uri.getRawQuery() != null ? "?" + uri.getRawQuery() : "");
- // Prepare the HTTP request.
- HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, query);
- request.headers().set(HttpHeaders.Names.HOST, host);
- request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
- request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("Status: " + getState() + ", URI:" + uri);
- }
- // Send the HTTP request.
- channel.writeAndFlush(request);
-
- // Wait for the server to close the connection. throw exception if failed
- channel.closeFuture().syncUninterruptibly();
-
- fileChunk.setLength(fileChunk.getFile().length());
-
- long start = 0;
- for (Long eachChunkLength : chunkLengths) {
- if (eachChunkLength == 0) continue;
- FileChunk chunk = new FileChunk(fileChunk.getFile(), start, eachChunkLength);
- chunk.setEbId(fileChunk.getEbId());
- chunk.setFromRemote(fileChunk.fromRemote());
- fileChunks.add(chunk);
- start += eachChunkLength;
- }
- return fileChunks;
-
- } finally {
- if(future != null && future.channel().isOpen()){
- // Close the channel to exit.
- future.channel().close().awaitUninterruptibly();
- }
-
- this.finishTime = System.currentTimeMillis();
- long elapsedMills = finishTime - startTime;
- String transferSpeed;
- if(elapsedMills > 1000) {
- long bytePerSec = (fileChunk.length() * 1000) / elapsedMills;
- transferSpeed = FileUtils.byteCountToDisplaySize(bytePerSec);
- } else {
- transferSpeed = FileUtils.byteCountToDisplaySize(Math.max(fileChunk.length(), 0));
- }
-
- LOG.info(String.format("Fetcher :%d ms elapsed. %s/sec, len:%d, state:%s, URL:%s",
- elapsedMills, transferSpeed, fileChunk.length(), getState(), uri));
- }
- }
-
- public URI getURI() {
- return this.uri;
- }
-
- class HttpClientHandler extends ChannelInboundHandlerAdapter {
- private final File file;
- private RandomAccessFile raf;
- private FileChannel fc;
- private long length = -1;
-
- public HttpClientHandler(File file) throws FileNotFoundException {
- this.file = file;
- this.raf = new RandomAccessFile(file, "rw");
- this.fc = raf.getChannel();
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg)
- throws Exception {
-
- messageReceiveCount++;
- if (msg instanceof HttpResponse) {
- try {
- HttpResponse response = (HttpResponse) msg;
-
- StringBuilder sb = new StringBuilder();
- if (LOG.isDebugEnabled()) {
- sb.append("STATUS: ").append(response.getStatus()).append(", VERSION: ")
- .append(response.getProtocolVersion()).append(", HEADER: ");
- }
- if (!response.headers().names().isEmpty()) {
- for (String name : response.headers().names()) {
- for (String value : response.headers().getAll(name)) {
- if (LOG.isDebugEnabled()) {
- sb.append(name).append(" = ").append(value);
- }
- if (this.length == -1 && name.equals("Content-Length")) {
- this.length = Long.parseLong(value);
- }
- }
- }
- if (response.headers().contains(TajoPullServerService.CHUNK_LENGTH_HEADER_NAME)) {
- String stringOffset = response.headers().get(TajoPullServerService.CHUNK_LENGTH_HEADER_NAME);
-
- for (String eachSplit : stringOffset.split(",")) {
- chunkLengths.add(Long.parseLong(eachSplit));
- }
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug(sb.toString());
- }
-
- if (response.getStatus().code() == HttpResponseStatus.NO_CONTENT.code()) {
- LOG.warn("There are no data corresponding to the request");
- length = 0;
- return;
- } else if (response.getStatus().code() != HttpResponseStatus.OK.code()) {
- LOG.error(response.getStatus().reasonPhrase());
- state = TajoProtos.FetcherState.FETCH_FAILED;
- return;
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- } finally {
- ReferenceCountUtil.release(msg);
- }
- }
-
- if (msg instanceof HttpContent) {
- try {
- HttpContent httpContent = (HttpContent) msg;
- ByteBuf content = httpContent.content();
- if (content.isReadable()) {
- content.readBytes(fc, content.readableBytes());
- }
-
- if (msg instanceof LastHttpContent) {
- if (raf != null) {
- fileLen = file.length();
- }
-
- finishTime = System.currentTimeMillis();
- if (state != TajoProtos.FetcherState.FETCH_FAILED) {
- state = TajoProtos.FetcherState.FETCH_FINISHED;
- }
-
- IOUtils.cleanup(LOG, fc, raf);
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- } finally {
- ReferenceCountUtil.release(msg);
- }
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- if (cause instanceof ReadTimeoutException) {
- LOG.warn(cause.getMessage(), cause);
- } else {
- LOG.error("Fetch failed :", cause);
- }
-
- // this fetching will be retry
- IOUtils.cleanup(LOG, fc, raf);
- finishTime = System.currentTimeMillis();
- state = TajoProtos.FetcherState.FETCH_FAILED;
- ctx.close();
- }
-
- @Override
- public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
- if(getState() != TajoProtos.FetcherState.FETCH_FINISHED){
- //channel is closed, but cannot complete fetcher
- finishTime = System.currentTimeMillis();
- LOG.error("Channel closed by peer: " + ctx.channel());
- state = TajoProtos.FetcherState.FETCH_FAILED;
- }
- IOUtils.cleanup(LOG, fc, raf);
-
- super.channelUnregistered(ctx);
- }
- }
-
- class HttpClientChannelInitializer extends ChannelInitializer<Channel> {
- private final File file;
-
- public HttpClientChannelInitializer(File file) {
- this.file = file;
- }
-
- @Override
- protected void initChannel(Channel channel) throws Exception {
- ChannelPipeline pipeline = channel.pipeline();
-
- int maxChunkSize = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE);
- int readTimeout = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT);
-
- pipeline.addLast("codec", new HttpClientCodec(4096, 8192, maxChunkSize));
- pipeline.addLast("inflater", new HttpContentDecompressor());
- pipeline.addLast("timeout", new ReadTimeoutHandler(readTimeout, TimeUnit.SECONDS));
- pipeline.addLast("handler", new HttpClientHandler(file));
- }
- }
-}