You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2016/04/27 06:27:11 UTC

[4/4] tajo git commit: TAJO-2122: PullServer as an Auxiliary service of Yarn.

TAJO-2122: PullServer as an Auxiliary service of Yarn.

Closes #1001


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/73ac4b87
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/73ac4b87
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/73ac4b87

Branch: refs/heads/master
Commit: 73ac4b87d0f7389b79be8a847e4215fc59befaff
Parents: 71193b2
Author: Jihoon Son <ji...@apache.org>
Authored: Wed Apr 27 13:26:28 2016 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Wed Apr 27 13:26:28 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 pom.xml                                         |   3 +-
 .../org/apache/tajo/TajoTestingCluster.java     |   2 +-
 .../java/org/apache/tajo/conf/TajoConf.java     |   1 +
 .../org/apache/tajo/exception/ErrorUtil.java    |  14 +-
 tajo-common/src/main/proto/tajo_protos.proto    |   8 +-
 tajo-core-tests/pom.xml                         |   4 +
 .../physical/TestProgressExternalSortExec.java  |   2 -
 .../apache/tajo/master/TestRepartitioner.java   |  10 +-
 .../apache/tajo/querymaster/TestKillQuery.java  |   3 +-
 .../apache/tajo/worker/MockExecutionBlock.java  |  42 --
 .../tajo/worker/MockExecutionBlockContext.java  |  42 ++
 .../apache/tajo/worker/MockTaskExecutor.java    |   2 +-
 .../org/apache/tajo/worker/MockTaskManager.java |   3 +-
 .../org/apache/tajo/worker/TestFetcher.java     | 236 -------
 .../worker/TestFetcherWithTajoPullServer.java   | 437 ++++++++++++
 .../apache/tajo/worker/TestTaskExecutor.java    |   2 +-
 .../apache/tajo/querymaster/Repartitioner.java  |  96 +--
 .../org/apache/tajo/worker/AbstractFetcher.java |  89 +++
 .../tajo/worker/ExecutionBlockContext.java      |  10 +-
 .../worker/ExecutionBlockSharedResource.java    |  16 +
 .../java/org/apache/tajo/worker/Fetcher.java    | 356 ----------
 .../org/apache/tajo/worker/LocalFetcher.java    | 480 +++++++++++++
 .../org/apache/tajo/worker/RemoteFetcher.java   | 317 +++++++++
 .../java/org/apache/tajo/worker/TajoWorker.java |  18 +-
 .../main/java/org/apache/tajo/worker/Task.java  |   2 +-
 .../java/org/apache/tajo/worker/TaskImpl.java   | 156 +----
 .../org/apache/tajo/worker/TaskManager.java     |  19 +-
 .../resources/webapps/worker/taskdetail.jsp     |   4 +-
 tajo-dist/pom.xml                               |   3 +
 tajo-docs/src/main/sphinx/configuration.rst     |   1 +
 .../main/sphinx/configuration/cluster_setup.rst |  18 +-
 .../configuration/pullserver_configuration.rst  |  75 ++
 tajo-project/pom.xml                            |  12 +-
 tajo-pullserver/pom.xml                         |   6 +-
 .../tajo/pullserver/FadvisedFileRegion.java     |   1 +
 .../tajo/pullserver/FileCloseListener.java      |  14 +-
 .../tajo/pullserver/PullServerConstants.java    |  93 +++
 .../apache/tajo/pullserver/PullServerUtil.java  | 688 ++++++++++++++++++-
 .../apache/tajo/pullserver/TajoPullServer.java  |   5 -
 .../tajo/pullserver/TajoPullServerService.java  | 683 +++++-------------
 .../tajo/pullserver/retriever/FileChunk.java    |   4 +-
 .../pullserver/retriever/FileChunkMeta.java     |  53 ++
 .../pullserver/retriever/IndexCacheKey.java     |  63 ++
 tajo-yarn/pom.xml                               | 265 +++++++
 .../apache/tajo/yarn/FadvisedChunkedFile.java   |  82 +++
 .../apache/tajo/yarn/FadvisedFileRegion.java    | 173 +++++
 .../org/apache/tajo/yarn/FileCloseListener.java |  41 ++
 .../apache/tajo/yarn/TajoPullServerService.java | 608 ++++++++++++++++
 49 files changed, 3794 insertions(+), 1470 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 38aedda..b729cce 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,8 @@ Release 0.12.0 - unreleased
 
   NEW FEATURES
 
+    TAJO-2122: PullServer as an Auxiliary service of Yarn. (jihoon)
+
     TAJO-2109: Implement Radix sort. (jihoon)
 
     TAJO-1955: Add a feature to strip quotes from CSV file. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d255652..71c062b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,12 +92,13 @@
     <module>tajo-sql-parser</module>
     <module>tajo-storage</module>
     <module>tajo-pullserver</module>
-    <module>tajo-dist</module>
+    <module>tajo-yarn</module>
     <module>tajo-thirdparty/asm</module>
     <module>tajo-cli</module>
     <module>tajo-metrics</module>
     <module>tajo-core-tests</module>
     <module>tajo-cluster-tests</module>
+    <module>tajo-dist</module>
   </modules>
 
   <build>

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 4e7d236..b1a3306 100644
--- a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -162,7 +162,7 @@ public class TajoTestingCluster {
     conf.setInt(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE.varname, 1);
     conf.setInt(ConfVars.$EXECUTOR_HASH_SHUFFLE_BUFFER_SIZE.varname, 1);
 
-    /** decrease Hbase thread and memory cache for testing */
+    /* decrease Hbase thread and memory cache for testing */
     //server handler
     conf.setInt("hbase.regionserver.handler.count", 5);
     //client handler

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 2e2fb18..440af80 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -214,6 +214,7 @@ public class TajoConf extends Configuration {
     PULLSERVER_CACHE_TIMEOUT("tajo.pullserver.index-cache.timeout-min", 5, Validators.min("1")),
     PULLSERVER_FETCH_URL_MAX_LENGTH("tajo.pullserver.fetch-url.max-length", StorageUnit.KB,
         Validators.min("1")),
+    YARN_SHUFFLE_SERVICE_ENABLED("tajo.shuffle.yarn-service.enabled", false, Validators.bool()),
     SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false, Validators.bool()),
     SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", BuiltinStorages.RAW, Validators.javaString()),
     SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num",

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java
index 9a71bd6..957b3d1 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java
@@ -34,12 +34,14 @@ public class ErrorUtil {
 
   public static Stacktrace.StackTrace convertStacktrace(Throwable t) {
     Stacktrace.StackTrace.Builder builder = Stacktrace.StackTrace.newBuilder();
-    for (StackTraceElement element : t.getStackTrace()) {
-      builder.addElement(Stacktrace.StackTrace.Element.newBuilder()
-              .setFilename(element.getFileName() == null ? "(Unknown Source)" : element.getFileName())
-              .setFunction(element.getClassName() + "::" + element.getMethodName())
-              .setLine(element.getLineNumber())
-      );
+    if (t != null) {
+      for (StackTraceElement element : t.getStackTrace()) {
+        builder.addElement(Stacktrace.StackTrace.Element.newBuilder()
+            .setFilename(element.getFileName() == null ? "(Unknown Source)" : element.getFileName())
+            .setFunction(element.getClassName() + "::" + element.getMethodName())
+            .setLine(element.getLineNumber())
+        );
+      }
     }
     return builder.build();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-common/src/main/proto/tajo_protos.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/tajo_protos.proto b/tajo-common/src/main/proto/tajo_protos.proto
index 1795107..78d48c4 100644
--- a/tajo-common/src/main/proto/tajo_protos.proto
+++ b/tajo-common/src/main/proto/tajo_protos.proto
@@ -49,9 +49,11 @@ enum TaskAttemptState {
 
 enum FetcherState {
   FETCH_INIT = 0;
-  FETCH_FETCHING = 1;
-  FETCH_FINISHED = 2;
-  FETCH_FAILED = 3;
+  FETCH_META_FETCHING = 1;
+  FETCH_META_FINISHED = 2;
+  FETCH_DATA_FETCHING = 3;
+  FETCH_DATA_FINISHED = 4;
+  FETCH_FAILED = 5;
 }
 
 message WorkerConnectionInfoProto {

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core-tests/pom.xml b/tajo-core-tests/pom.xml
index b12642a..3554df4 100644
--- a/tajo-core-tests/pom.xml
+++ b/tajo-core-tests/pom.xml
@@ -174,6 +174,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-yarn</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
       <artifactId>tajo-rpc-protobuf</artifactId>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
index eeb179f..51cd5ea 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
@@ -173,13 +173,11 @@ public class TestProgressExternalSortExec {
     while ((tuple = exec.next()) != null) {
       if (cnt == 0) {
         initProgress = exec.getProgress();
-        System.out.println(initProgress);
         assertTrue(initProgress > 0.5f && initProgress < 1.0f);
       }
 
       if (cnt == testDataStats.getNumRows() / 2) {
         float progress = exec.getProgress();
-        System.out.println(progress);
         assertTrue(progress > initProgress);
       }
       curVal = tuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index a13a750..abec6a0 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -20,12 +20,13 @@ package org.apache.tajo.master;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import io.netty.handler.codec.http.QueryStringDecoder;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.ResourceProtos.FetchProto;
 import org.apache.tajo.TestTajoIds;
+import org.apache.tajo.pullserver.PullServerConstants;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerParams;
 import org.apache.tajo.querymaster.Repartitioner;
 import org.apache.tajo.querymaster.Task;
 import org.apache.tajo.querymaster.Task.IntermediateEntry;
@@ -88,12 +89,11 @@ public class TestRepartitioner {
       assertEquals(1, uris.size());   //In Hash Suffle, Fetcher return only one URI per partition.
 
       URI uri = uris.get(0);
-      final Map<String, List<String>> params =
-          new QueryStringDecoder(uri).parameters();
+      final PullServerParams params = new PullServerParams(uri);
 
       assertEquals(eachEntry.getKey().toString(), params.get("p").get(0));
-      assertEquals("h", params.get("type").get(0));
-      assertEquals("" + sid.getId(), params.get("sid").get(0));
+      assertEquals(PullServerConstants.HASH_SHUFFLE_PARAM_STRING, params.shuffleType());
+      assertEquals("" + sid.getId(), params.ebId());
     }
 
     Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> mergedHashEntries =

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index ac5efd9..8d33dbc 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -26,7 +26,6 @@ import org.apache.tajo.*;
 import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.CatalogService;
-import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.parser.sql.SQLAnalyzer;
@@ -269,7 +268,7 @@ public class TestKillQuery {
       }
     };
 
-    ExecutionBlockContext context = new MockExecutionBlock(workerContext, requestProtoBuilder.build()) {
+    ExecutionBlockContext context = new MockExecutionBlockContext(workerContext, requestProtoBuilder.build()) {
       @Override
       public Path createBaseDir() throws IOException {
         return new Path("test");

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
deleted file mode 100644
index cbc4312..0000000
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
-import org.apache.tajo.TaskAttemptId;
-
-import java.io.IOException;
-
-public class MockExecutionBlock extends ExecutionBlockContext {
-
-  public MockExecutionBlock(TajoWorker.WorkerContext workerContext,
-                            ExecutionBlockContextResponse request) throws IOException {
-    super(workerContext, request, null);
-  }
-
-  @Override
-  public void init() throws Throwable {
-    //skip
-  }
-
-  @Override
-  public void fatalError(TaskAttemptId taskAttemptId, Throwable throwable) {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlockContext.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlockContext.java
new file mode 100644
index 0000000..b64ab9b
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlockContext.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
+import org.apache.tajo.TaskAttemptId;
+
+import java.io.IOException;
+
+public class MockExecutionBlockContext extends ExecutionBlockContext {
+
+  public MockExecutionBlockContext(TajoWorker.WorkerContext workerContext,
+                                   ExecutionBlockContextResponse request) throws IOException {
+    super(workerContext, request, null, null);
+  }
+
+  @Override
+  public void init() throws Throwable {
+    //skip
+  }
+
+  @Override
+  public void fatalError(TaskAttemptId taskAttemptId, Throwable throwable) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
index 071d26a..ea609b1 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
@@ -145,7 +145,7 @@ public class MockTaskExecutor extends TaskExecutor {
       }
 
       @Override
-      public List<Fetcher> getFetchers() {
+      public List<AbstractFetcher> getFetchers() {
         return null;
       }
     };

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
index 5979bbb..0e114bb 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
@@ -29,7 +29,6 @@ import org.apache.tajo.worker.event.TaskManagerEvent;
 
 import java.io.IOException;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeoutException;
 
 public class MockTaskManager extends TaskManager {
 
@@ -61,7 +60,7 @@ public class MockTaskManager extends TaskManager {
           .setQueryContext(new QueryContext(new TajoConf()).getProto())
           .setQueryOutputPath("testpath")
           .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
-      return new MockExecutionBlock(getWorkerContext(), builder.build());
+      return new MockExecutionBlockContext(getWorkerContext(), builder.build());
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
deleted file mode 100644
index dfc37b0..0000000
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import org.apache.hadoop.fs.*;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Random;
-
-import static org.junit.Assert.*;
-
-public class TestFetcher {
-  private String TEST_DATA = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestFetcher";
-  private String INPUT_DIR = TEST_DATA+"/in/";
-  private String OUTPUT_DIR = TEST_DATA+"/out/";
-  private TajoConf conf = new TajoConf();
-  private TajoPullServerService pullServerService;
-
-  @Before
-  public void setUp() throws Exception {
-    CommonTestingUtil.getTestDir(TEST_DATA);
-    CommonTestingUtil.getTestDir(INPUT_DIR);
-    CommonTestingUtil.getTestDir(OUTPUT_DIR);
-    conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, INPUT_DIR);
-    conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT, 1);
-    conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE, 127);
-
-    pullServerService = new TajoPullServerService();
-    pullServerService.init(conf);
-    pullServerService.start();
-  }
-
-  @After
-  public void tearDown(){
-    pullServerService.stop();
-  }
-
-  @Test
-  public void testGet() throws IOException {
-    Random rnd = new Random();
-    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
-    String sid = "1";
-    String partId = "1";
-
-    int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
-    String dataPath = conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) +
-       queryId.toString() + "/output/" + sid + "/hash-shuffle/" + partParentId + "/" + partId;
-
-    String params = String.format("qid=%s&sid=%s&p=%s&type=%s", queryId, sid, partId, "h");
-
-    Path inputPath = new Path(dataPath);
-    FSDataOutputStream stream = FileSystem.getLocal(conf).create(inputPath, true);
-    for (int i = 0; i < 100; i++) {
-      String data = ""+rnd.nextInt();
-      stream.write(data.getBytes());
-    }
-    stream.flush();
-    stream.close();
-
-    URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
-    storeChunk.setFromRemote(true);
-    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
-    FileChunk chunk = fetcher.get().get(0);
-    assertNotNull(chunk);
-    assertNotNull(chunk.getFile());
-
-    FileSystem fs = FileSystem.getLocal(new TajoConf());
-    FileStatus inStatus = fs.getFileStatus(inputPath);
-    FileStatus outStatus = fs.getFileStatus(new Path(OUTPUT_DIR, "data"));
-
-    assertEquals(inStatus.getLen(), outStatus.getLen());
-    assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
-  }
-
-  @Test
-  public void testAdjustFetchProcess() {
-    assertEquals(0.0f, TaskImpl.adjustFetchProcess(0, 0), 0);
-    assertEquals(0.0f, TaskImpl.adjustFetchProcess(10, 10), 0);
-    assertEquals(0.05f, TaskImpl.adjustFetchProcess(10, 9), 0);
-    assertEquals(0.1f, TaskImpl.adjustFetchProcess(10, 8), 0);
-    assertEquals(0.25f, TaskImpl.adjustFetchProcess(10, 5), 0);
-    assertEquals(0.45f, TaskImpl.adjustFetchProcess(10, 1), 0);
-    assertEquals(0.5f, TaskImpl.adjustFetchProcess(10, 0), 0);
-  }
-
-  @Test
-  public void testStatus() throws Exception {
-    Random rnd = new Random();
-    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
-    String sid = "1";
-    String ta = "1_0";
-    String partId = "1";
-
-    String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
-    String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
-
-    FSDataOutputStream stream =  FileSystem.getLocal(conf).create(new Path(dataPath), true);
-    for (int i = 0; i < 100; i++) {
-      String data = ""+rnd.nextInt();
-      stream.write(data.getBytes());
-    }
-    stream.flush();
-    stream.close();
-
-    URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
-    storeChunk.setFromRemote(true);
-    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
-    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
-
-    fetcher.get();
-    assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
-  }
-
-  @Test
-  public void testNoContentFetch() throws Exception {
-
-    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
-    String sid = "1";
-    String ta = "1_0";
-    String partId = "1";
-
-    String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
-    String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
-
-    Path inputPath = new Path(dataPath);
-    FileSystem fs = FileSystem.getLocal(conf);
-    if(fs.exists(inputPath)){
-      fs.delete(new Path(dataPath), true);
-    }
-
-    FSDataOutputStream stream =  FileSystem.getLocal(conf).create(new Path(dataPath).getParent(), true);
-    stream.close();
-
-    URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
-    storeChunk.setFromRemote(true);
-    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
-    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
-
-    fetcher.get();
-    assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
-  }
-
-  @Test
-  public void testFailureStatus() throws Exception {
-    Random rnd = new Random();
-
-    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
-    String sid = "1";
-    String ta = "1_0";
-    String partId = "1";
-
-    String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
-
-    //TajoPullServerService will be throws BAD_REQUEST by Unknown shuffle type
-    String shuffleType = "x";
-    String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, shuffleType, ta);
-
-    FSDataOutputStream stream =  FileSystem.getLocal(conf).create(new Path(dataPath), true);
-
-    for (int i = 0; i < 100; i++) {
-      String data = params + rnd.nextInt();
-      stream.write(data.getBytes());
-    }
-    stream.flush();
-    stream.close();
-
-    URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
-    storeChunk.setFromRemote(true);
-    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
-    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
-
-    fetcher.get();
-    assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
-  }
-
-  @Test
-  public void testServerFailure() throws Exception {
-    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
-    String sid = "1";
-    String ta = "1_0";
-    String partId = "1";
-
-    String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
-    String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
-
-    URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
-    storeChunk.setFromRemote(true);
-    final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
-    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
-
-    pullServerService.stop();
-
-    boolean failure = false;
-    try{
-      fetcher.get();
-    } catch (Throwable e){
-      failure = true;
-    }
-    assertTrue(failure);
-    assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcherWithTajoPullServer.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcherWithTajoPullServer.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcherWithTajoPullServer.java
new file mode 100644
index 0000000..8844fce
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcherWithTajoPullServer.java
@@ -0,0 +1,437 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.pullserver.PullServerConstants;
+import org.apache.tajo.pullserver.PullServerUtil;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerRequestURIBuilder;
+import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.FetchImpl.RangeParam;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.*;
+
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+public class TestFetcherWithTajoPullServer {
+  private enum FetchType {
+    LOCAL,
+    REMOTE
+  }
+  private enum PullServerType {
+    TAJO,
+    YARN
+  }
+
+  private final String TEST_DATA = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/" +
+      TestFetcherWithTajoPullServer.class.getSimpleName();
+  private final String INPUT_DIR = TEST_DATA+"/in/";
+  private final String OUTPUT_DIR = TEST_DATA+"/out/";
+  private final TajoConf conf = new TajoConf();
+  private Service pullServerService;
+  private final int maxUrlLength = conf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
+  private final String TEST_TABLE_NAME = "test";
+  private final FetchType fetchType;
+  private final PullServerType pullServerType;
+  private int pullserverPort;
+
+  public TestFetcherWithTajoPullServer(FetchType fetchType, PullServerType pullServerType) {
+    this.fetchType = fetchType;
+    this.pullServerType = pullServerType;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    CommonTestingUtil.getTestDir(TEST_DATA);
+    CommonTestingUtil.getTestDir(INPUT_DIR);
+    CommonTestingUtil.getTestDir(OUTPUT_DIR);
+    conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, INPUT_DIR);
+    conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT, 1);
+    conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE, 127);
+
+    if (pullServerType.equals(PullServerType.TAJO)) {
+      pullServerService = new TajoPullServerService();
+    } else {
+      pullServerService = new org.apache.tajo.yarn.TajoPullServerService();
+    }
+    pullServerService.init(conf);
+    pullServerService.start();
+
+    if (pullServerType.equals(PullServerType.TAJO)) {
+      pullserverPort = ((TajoPullServerService)pullServerService).getPort();
+    } else {
+      pullserverPort = ((org.apache.tajo.yarn.TajoPullServerService)pullServerService).getPort();
+    }
+  }
+
+  @After
+  public void tearDown() {
+    pullServerService.stop();
+  }
+
+  @Parameters(name = "{index}: {0}, {1}")
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][] {
+        {FetchType.LOCAL, PullServerType.TAJO},
+        {FetchType.REMOTE, PullServerType.TAJO},
+        {FetchType.LOCAL, PullServerType.YARN},
+        {FetchType.REMOTE, PullServerType.YARN}
+    });
+  }
+
+  private AbstractFetcher getFetcher(URI uri, File data) throws IOException {
+    if (fetchType.equals(FetchType.LOCAL)) {
+      return new LocalFetcher(conf, uri, TEST_TABLE_NAME);
+    } else {
+      FileChunk storeChunk = new FileChunk(data, 0, data.length());
+      storeChunk.setFromRemote(true);
+      return new RemoteFetcher(conf, uri, storeChunk);
+    }
+  }
+
+  @Test
+  public void testGetHashShuffle() throws IOException {
+    Random rnd = new Random();
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String partId = "1";
+
+    Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId.toString(), sid);
+    final int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+    final Path dataPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+
+    PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+        maxUrlLength);
+    builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+        .setQueryId(queryId.toString())
+        .setEbId(sid)
+        .setPartId(partId)
+        .setShuffleType(PullServerConstants.HASH_SHUFFLE_PARAM_STRING);
+
+    Path inputPath = new Path(INPUT_DIR, dataPath);
+    FSDataOutputStream stream = FileSystem.getLocal(conf).create(inputPath, true);
+    for (int i = 0; i < 100; i++) {
+      String data = ""+rnd.nextInt();
+      stream.write(data.getBytes());
+    }
+    stream.flush();
+    stream.close();
+
+    URI uri = builder.build(false).get(0);
+    File data = new File(OUTPUT_DIR + "data");
+
+    final AbstractFetcher fetcher = getFetcher(uri, data);
+
+    FileChunk chunk = fetcher.get().get(0);
+    assertNotNull(chunk);
+    assertNotNull(chunk.getFile());
+
+    FileSystem fs = FileSystem.getLocal(new TajoConf());
+    FileStatus inStatus = fs.getFileStatus(inputPath);
+    FileStatus outStatus = fs.getFileStatus(new Path(chunk.getFile().getAbsolutePath()));
+
+    assertEquals(inStatus.getLen(), outStatus.getLen());
+    assertEquals(FetcherState.FETCH_DATA_FINISHED, fetcher.getState());
+  }
+
+  @Test
+  public void testGetRangeShuffle() throws IOException {
+    Random rnd = new Random();
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String partId = "1";
+    String taskId = "1";
+    String attemptId = "0";
+
+    Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId.toString(), sid);
+    Path outDir = StorageUtil.concatPath(queryBaseDir, taskId + "_" + attemptId, "output");
+    Path dataPath = StorageUtil.concatPath(outDir, "output");
+    Path indexPath = StorageUtil.concatPath(outDir, "index");
+
+    List<String> strings = new ArrayList<>(100);
+    for (int i = 0; i < 100; i++) {
+      strings.add("" + rnd.nextInt());
+    }
+    Collections.sort(strings);
+
+    Path inputPath = new Path(INPUT_DIR, dataPath);
+    FileSystem fs = FileSystem.getLocal(conf);
+    if (fs.exists(outDir)) {
+      fs.delete(outDir, true);
+    }
+    final FSDataOutputStream stream = fs.create(inputPath, true);
+    BSTIndex index = new BSTIndex(conf);
+    Schema schema = SchemaBuilder.builder().addAll(new Column[] {new Column("rnd", Type.TEXT)}).build();
+    SortSpec[] sortSpecs = new SortSpec[] {new SortSpec(schema.getColumn(0))};
+    BSTIndexWriter writer = index.getIndexWriter(new Path(INPUT_DIR, indexPath), BSTIndex.TWO_LEVEL_INDEX, schema, new BaseTupleComparator(schema, sortSpecs), true);
+    writer.init();
+
+    for (String t : strings) {
+      writer.write(new VTuple(new Datum[] {DatumFactory.createText(t)}), stream.getPos());
+      stream.write(t.getBytes());
+    }
+    stream.flush();
+    writer.flush();
+    stream.close();
+    writer.close();
+
+    RangeParam rangeParam = new RangeParam(new TupleRange(sortSpecs,
+        new VTuple(new Datum[] {DatumFactory.createText(strings.get(0))}),
+        new VTuple(new Datum[] {DatumFactory.createText(strings.get(strings.size() - 1))})), true, RowStoreUtil.createEncoder(schema));
+    PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+        maxUrlLength);
+    builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+        .setQueryId(queryId.toString())
+        .setEbId(sid)
+        .setPartId(partId)
+        .setShuffleType(PullServerConstants.RANGE_SHUFFLE_PARAM_STRING)
+        .setTaskIds(Lists.newArrayList(Integer.parseInt(taskId)))
+        .setAttemptIds(Lists.newArrayList(Integer.parseInt(attemptId)))
+        .setStartKeyBase64(new String(Base64.encodeBase64(rangeParam.getStart())))
+        .setEndKeyBase64(new String(Base64.encodeBase64(rangeParam.getEnd())))
+        .setLastInclude(true);
+
+    URI uri = builder.build(true).get(0);
+    File data = new File(OUTPUT_DIR + "data");
+
+    final AbstractFetcher fetcher = getFetcher(uri, data);
+
+    FileChunk chunk = fetcher.get().get(0);
+    assertNotNull(chunk);
+    assertNotNull(chunk.getFile());
+
+    FileStatus inStatus = fs.getFileStatus(inputPath);
+    FileStatus outStatus = fs.getFileStatus(new Path(chunk.getFile().getAbsolutePath()));
+
+    assertEquals(inStatus.getLen(), outStatus.getLen());
+    assertEquals(FetcherState.FETCH_DATA_FINISHED, fetcher.getState());
+  }
+
+  @Test
+  public void testAdjustFetchProcess() {
+    Assert.assertEquals(0.0f, TaskImpl.adjustFetchProcess(0, 0), 0);
+    assertEquals(0.0f, TaskImpl.adjustFetchProcess(10, 10), 0);
+    assertEquals(0.05f, TaskImpl.adjustFetchProcess(10, 9), 0);
+    assertEquals(0.1f, TaskImpl.adjustFetchProcess(10, 8), 0);
+    assertEquals(0.25f, TaskImpl.adjustFetchProcess(10, 5), 0);
+    assertEquals(0.45f, TaskImpl.adjustFetchProcess(10, 1), 0);
+    assertEquals(0.5f, TaskImpl.adjustFetchProcess(10, 0), 0);
+  }
+
+  @Test
+  public void testStatus() throws Exception {
+    Random rnd = new Random();
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String ta = "1_0";
+    String partId = "1";
+
+    Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId.toString(), sid);
+    final int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+    final Path dataPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+
+    PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+        maxUrlLength);
+    builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+        .setQueryId(queryId.toString())
+        .setEbId(sid)
+        .setPartId(partId)
+        .setShuffleType(PullServerConstants.HASH_SHUFFLE_PARAM_STRING)
+        .setTaskAttemptIds(Lists.newArrayList(ta));
+
+    FSDataOutputStream stream =  FileSystem.getLocal(conf).create(new Path(INPUT_DIR, dataPath), true);
+    for (int i = 0; i < 100; i++) {
+      String data = ""+rnd.nextInt();
+      stream.write(data.getBytes());
+    }
+    stream.flush();
+    stream.close();
+
+    URI uri = builder.build(true).get(0);
+    File data = new File(OUTPUT_DIR + "data");
+    final AbstractFetcher fetcher = getFetcher(uri, data);
+    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+    fetcher.get();
+    assertEquals(FetcherState.FETCH_DATA_FINISHED, fetcher.getState());
+  }
+
+  @Test
+  public void testNoContentFetch() throws Exception {
+
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String ta = "1_0";
+    String partId = "1";
+
+    Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId.toString(), sid);
+    final int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+    final Path dataPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+
+    PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+        maxUrlLength);
+    builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+        .setQueryId(queryId.toString())
+        .setEbId(sid)
+        .setPartId(partId)
+        .setShuffleType(PullServerConstants.HASH_SHUFFLE_PARAM_STRING)
+        .setTaskAttemptIds(Lists.newArrayList(ta));
+
+    Path inputPath = new Path(INPUT_DIR, dataPath);
+    FileSystem fs = FileSystem.getLocal(conf);
+    if(fs.exists(inputPath)){
+      fs.delete(inputPath, true);
+    }
+
+    FSDataOutputStream stream =  fs.create(inputPath, true);
+    stream.close();
+
+    URI uri = builder.build(true).get(0);
+    File data = new File(OUTPUT_DIR + "data");
+    final AbstractFetcher fetcher = getFetcher(uri, data);
+    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+    try {
+      fetcher.get();
+      if (fetchType.equals(FetchType.LOCAL)) {
+        fail();
+      }
+    } catch (IOException e) {
+      if (fetchType.equals(FetchType.REMOTE)) {
+        fail();
+      }
+    }
+    assertEquals(FetcherState.FETCH_FAILED, fetcher.getState());
+  }
+
+  @Test
+  public void testFailureStatus() throws Exception {
+    Random rnd = new Random();
+
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String ta = "1_0";
+    String partId = "1";
+
+    Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId.toString(), sid);
+    final int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+    final Path dataPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+
+    PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+        maxUrlLength);
+    builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+        .setQueryId(queryId.toString())
+        .setEbId(sid)
+        .setPartId(partId)
+        .setShuffleType("x") //TajoPullServerService will be throws BAD_REQUEST by Unknown shuffle type
+        .setTaskAttemptIds(Lists.newArrayList(ta));
+
+    FSDataOutputStream stream =  FileSystem.getLocal(conf).create(new Path(INPUT_DIR, dataPath), true);
+
+    for (int i = 0; i < 100; i++) {
+      String data = "" + rnd.nextInt();
+      stream.write(data.getBytes());
+    }
+    stream.flush();
+    stream.close();
+
+    URI uri = builder.build(true).get(0);
+    File data = new File(OUTPUT_DIR + "data");
+    final AbstractFetcher fetcher = getFetcher(uri, data);
+    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+    try {
+      fetcher.get();
+      if (fetchType.equals(FetchType.LOCAL)) {
+        fail();
+      }
+    } catch (IllegalArgumentException e) {
+      if (!fetchType.equals(FetchType.LOCAL)) {
+        fail();
+      }
+    }
+    assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
+  }
+
+  @Test
+  public void testServerFailure() throws Exception {
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String ta = "1_0";
+    String partId = "1";
+
+    PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+        maxUrlLength);
+    builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+        .setQueryId(queryId.toString())
+        .setEbId(sid)
+        .setPartId(partId)
+        .setShuffleType(PullServerConstants.HASH_SHUFFLE_PARAM_STRING)
+        .setTaskAttemptIds(Lists.newArrayList(ta));
+
+    URI uri = builder.build(true).get(0);
+    File data = new File(OUTPUT_DIR + "data");
+    final AbstractFetcher fetcher = getFetcher(uri, data);
+    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+    pullServerService.stop();
+
+    boolean failure = false;
+    try{
+      fetcher.get();
+    } catch (IOException e){
+      failure = true;
+    }
+    assertTrue(failure);
+    assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
index 45e430e..df5b3c8 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
@@ -305,7 +305,7 @@ public class TestTaskExecutor {
         }
 
         @Override
-        public List<Fetcher> getFetchers() {
+        public List<AbstractFetcher> getFetchers() {
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index 2a688e5..ba051a3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -50,6 +50,8 @@ import org.apache.tajo.plan.logical.SortNode.SortPurpose;
 import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
 import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
 import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.pullserver.PullServerConstants;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerRequestURIBuilder;
 import org.apache.tajo.querymaster.Task.IntermediateEntry;
 import org.apache.tajo.querymaster.Task.PullHost;
 import org.apache.tajo.storage.*;
@@ -70,7 +72,6 @@ import java.net.URLEncoder;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.*;
@@ -1124,89 +1125,32 @@ public class Repartitioner {
   }
 
   public static List<URI> createFetchURL(int maxUrlLength, FetchProto fetch, boolean includeParts) {
-    String scheme = "http://";
-
-    StringBuilder urlPrefix = new StringBuilder(scheme);
+    PullServerRequestURIBuilder builder =
+        new PullServerRequestURIBuilder(fetch.getHost(), fetch.getPort(), maxUrlLength);
     ExecutionBlockId ebId = new ExecutionBlockId(fetch.getExecutionBlockId());
-    urlPrefix.append(fetch.getHost()).append(":").append(fetch.getPort()).append("/?")
-        .append("qid=").append(ebId.getQueryId().toString())
-        .append("&sid=").append(ebId.getId())
-        .append("&p=").append(fetch.getPartitionId())
-        .append("&type=");
+    builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+        .setQueryId(ebId.getQueryId().toString())
+        .setEbId(ebId.getId())
+        .setPartId(fetch.getPartitionId());
+
     if (fetch.getType() == HASH_SHUFFLE) {
-      urlPrefix.append("h");
+      builder.setShuffleType(PullServerConstants.HASH_SHUFFLE_PARAM_STRING);
     } else if (fetch.getType() == RANGE_SHUFFLE) {
-      urlPrefix.append("r").append("&").append(getRangeParam(fetch));
+      builder.setShuffleType(PullServerConstants.RANGE_SHUFFLE_PARAM_STRING);
+      builder.setStartKeyBase64(new String(org.apache.commons.codec.binary.Base64.encodeBase64(fetch.getRangeStart().toByteArray())));
+      builder.setEndKeyBase64(new String(org.apache.commons.codec.binary.Base64.encodeBase64(fetch.getRangeEnd().toByteArray())));
+      builder.setLastInclude(fetch.getRangeLastInclusive());
     } else if (fetch.getType() == SCATTERED_HASH_SHUFFLE) {
-      urlPrefix.append("s");
+      builder.setShuffleType(PullServerConstants.SCATTERED_HASH_SHUFFLE_PARAM_STRING);
     }
-
     if (fetch.getLength() >= 0) {
-      urlPrefix.append("&offset=").append(fetch.getOffset()).append("&length=").append(fetch.getLength());
+      builder.setOffset(fetch.getOffset()).setLength(fetch.getLength());
     }
-
-    List<URI> fetchURLs = new ArrayList<>();
-    if(includeParts) {
-      if (fetch.getType() == HASH_SHUFFLE || fetch.getType() == SCATTERED_HASH_SHUFFLE) {
-        fetchURLs.add(URI.create(urlPrefix.toString()));
-      } else {
-        urlPrefix.append("&ta=");
-        // If the get request is longer than 2000 characters,
-        // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
-        // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
-        // The below code transforms a long request to multiple requests.
-        List<String> taskIdsParams = new ArrayList<>();
-        StringBuilder taskIdListBuilder = new StringBuilder();
-        
-        final List<Integer> taskIds = fetch.getTaskIdList();
-        final List<Integer> attemptIds = fetch.getAttemptIdList();
-
-        // Sort task ids to increase cache hit in pull server
-        final List<Pair<Integer, Integer>> taskAndAttemptIds = IntStream.range(0, taskIds.size())
-            .mapToObj(i -> new Pair<>(taskIds.get(i), attemptIds.get(i)))
-            .sorted((p1, p2) -> p1.getFirst() - p2.getFirst())
-            .collect(Collectors.toList());
-
-        boolean first = true;
-
-        for (int i = 0; i < taskAndAttemptIds.size(); i++) {
-          StringBuilder taskAttemptId = new StringBuilder();
-
-          if (!first) { // when comma is added?
-            taskAttemptId.append(",");
-          } else {
-            first = false;
-          }
-
-          int taskId = taskAndAttemptIds.get(i).getFirst();
-          if (taskId < 0) {
-            // In the case of hash shuffle each partition has single shuffle file per worker.
-            // TODO If file is large, consider multiple fetching(shuffle file can be split)
-            continue;
-          }
-          int attemptId = taskAndAttemptIds.get(i).getSecond();
-          taskAttemptId.append(taskId).append("_").append(attemptId);
-
-          if (urlPrefix.length() + taskIdListBuilder.length() > maxUrlLength) {
-            taskIdsParams.add(taskIdListBuilder.toString());
-            taskIdListBuilder = new StringBuilder(taskId + "_" + attemptId);
-          } else {
-            taskIdListBuilder.append(taskAttemptId);
-          }
-        }
-        // if the url params remain
-        if (taskIdListBuilder.length() > 0) {
-          taskIdsParams.add(taskIdListBuilder.toString());
-        }
-        for (String param : taskIdsParams) {
-          fetchURLs.add(URI.create(urlPrefix + param));
-        }
-      }
-    } else {
-      fetchURLs.add(URI.create(urlPrefix.toString()));
+    if (includeParts) {
+      builder.setTaskIds(fetch.getTaskIdList());
+      builder.setAttemptIds(fetch.getAttemptIdList());
     }
-
-    return fetchURLs;
+    return builder.build(includeParts);
   }
 
   public static Map<Integer, List<IntermediateEntry>> hashByKey(List<IntermediateEntry> entries) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractFetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractFetcher.java
new file mode 100644
index 0000000..a12db77
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractFetcher.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+public abstract class AbstractFetcher {
+
+  protected final URI uri;
+  protected FileChunk fileChunk;
+  protected final TajoConf conf;
+
+  protected TajoProtos.FetcherState state;
+
+  protected long startTime;
+  protected volatile long finishTime;
+  protected int fileNum;
+  protected long fileLen;
+  protected int messageReceiveCount;
+
+  public AbstractFetcher(TajoConf conf, URI uri) {
+    this(conf, uri, null);
+  }
+
+  public AbstractFetcher(TajoConf conf, URI uri, FileChunk fileChunk) {
+    this.conf = conf;
+    this.uri = uri;
+    this.fileChunk = fileChunk;
+    this.state = TajoProtos.FetcherState.FETCH_INIT;
+  }
+
+  public URI getURI() {
+    return this.uri;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  public long getFileLen() {
+    return fileLen;
+  }
+
+  public int getFileNum() {
+    return fileNum;
+  }
+
+  public TajoProtos.FetcherState getState() {
+    return state;
+  }
+
+  public int getMessageReceiveCount() {
+    return messageReceiveCount;
+  }
+
+  public abstract List<FileChunk> get() throws IOException;
+
+  protected void endFetch(FetcherState state) {
+    this.finishTime = System.currentTimeMillis();
+    this.state = state;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index e675d70..4ab6627 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -37,6 +37,7 @@ import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TaskId;
+import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.ErrorUtil;
@@ -44,6 +45,7 @@ import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.pullserver.PullServerUtil;
 import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.rpc.*;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
@@ -103,7 +105,7 @@ public class ExecutionBlockContext {
   private final Map<TaskId, TaskHistory> taskHistories = Maps.newConcurrentMap();
 
   public ExecutionBlockContext(TajoWorker.WorkerContext workerContext, ExecutionBlockContextResponse request,
-                               AsyncRpcClient queryMasterClient)
+                               AsyncRpcClient queryMasterClient, @Nullable TajoPullServerService pullServerService)
       throws IOException {
     this.executionBlockId = new ExecutionBlockId(request.getExecutionBlockId());
     this.connManager = RpcClientManager.getInstance();
@@ -117,7 +119,7 @@ public class ExecutionBlockContext {
     this.queryEngine = new TajoQueryEngine(systemConf);
     this.queryContext = new QueryContext(workerContext.getConf(), request.getQueryContext());
     this.plan = request.getPlanJson();
-    this.resource = new ExecutionBlockSharedResource();
+    this.resource = new ExecutionBlockSharedResource(pullServerService);
     this.workerContext = workerContext;
     this.shuffleType = request.getShuffleType();
     this.queryMasterClient = queryMasterClient;
@@ -281,12 +283,12 @@ public class ExecutionBlockContext {
   }
 
   public static Path getBaseOutputDir(ExecutionBlockId executionBlockId) {
-    return TajoPullServerService.getBaseOutputDir(
+    return PullServerUtil.getBaseOutputDir(
         executionBlockId.getQueryId().toString(), String.valueOf(executionBlockId.getId()));
   }
 
   public static Path getBaseInputDir(ExecutionBlockId executionBlockId) {
-    return TajoPullServerService.getBaseInputDir(executionBlockId.getQueryId().toString(), executionBlockId.toString());
+    return PullServerUtil.getBaseInputDir(executionBlockId.getQueryId().toString(), executionBlockId.toString());
   }
 
   public ExecutionBlockId getExecutionBlockId() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
index 660f875..e1ff917 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.SessionVars;
+import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.engine.codegen.ExecutorPreCompiler;
 import org.apache.tajo.engine.codegen.TajoClassLoader;
@@ -35,8 +36,10 @@ import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.util.Pair;
 
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class ExecutionBlockSharedResource {
@@ -53,6 +56,15 @@ public class ExecutionBlockSharedResource {
   private ExecutorPreCompiler.CompilationContext compilationContext;
   private LogicalNode plan;
   private boolean codeGenEnabled = false;
+  private final TajoPullServerService pullServerService;
+
+  public ExecutionBlockSharedResource() {
+    this(null);
+  }
+
+  public ExecutionBlockSharedResource(@Nullable TajoPullServerService pullServerService) {
+    this.pullServerService = pullServerService;
+  }
 
   public void initialize(final QueryContext context, final String planJson) {
 
@@ -130,6 +142,10 @@ public class ExecutionBlockSharedResource {
     TableCache.getInstance().releaseCache(id);
   }
 
+  public Optional<TajoPullServerService> getPullServerService() {
+    return pullServerService == null ? Optional.empty() : Optional.of(pullServerService);
+  }
+
   public void release() {
     compilationContext = null;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
deleted file mode 100644
index 250b4cc..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.*;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.http.*;
-import io.netty.handler.timeout.ReadTimeoutException;
-import io.netty.handler.timeout.ReadTimeoutHandler;
-import io.netty.util.ReferenceCountUtil;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.TajoProtos.FetcherState;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.rpc.NettyUtils;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Fetcher fetches data from a given uri via HTTP protocol and stores them into
- * a specific file. It aims at asynchronous and efficient data transmit.
- */
-public class Fetcher {
-
-  private final static Log LOG = LogFactory.getLog(Fetcher.class);
-
-  private final URI uri;
-  private final FileChunk fileChunk;
-  private final TajoConf conf;
-
-  private final String host;
-  private int port;
-  private final boolean useLocalFile;
-
-  private long startTime;
-  private volatile long finishTime;
-  private long fileLen;
-  private int messageReceiveCount;
-  private TajoProtos.FetcherState state;
-
-  private Bootstrap bootstrap;
-  private List<Long> chunkLengths = new ArrayList<>();
-
-  public Fetcher(TajoConf conf, URI uri, FileChunk chunk) {
-    this.uri = uri;
-    this.fileChunk = chunk;
-    this.useLocalFile = !chunk.fromRemote();
-    this.state = TajoProtos.FetcherState.FETCH_INIT;
-    this.conf = conf;
-
-    String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
-    this.host = uri.getHost() == null ? "localhost" : uri.getHost();
-    this.port = uri.getPort();
-    if (port == -1) {
-      if (scheme.equalsIgnoreCase("http")) {
-        this.port = 80;
-      } else if (scheme.equalsIgnoreCase("https")) {
-        this.port = 443;
-      }
-    }
-
-    if (!useLocalFile) {
-      bootstrap = new Bootstrap()
-          .group(
-              NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER,
-                  conf.getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
-          .channel(NioSocketChannel.class)
-          .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
-          .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
-              conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000)
-          .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
-          .option(ChannelOption.TCP_NODELAY, true);
-    }
-  }
-
-  public long getStartTime() {
-    return startTime;
-  }
-
-  public long getFinishTime() {
-    return finishTime;
-  }
-
-  public long getFileLen() {
-    return fileLen;
-  }
-
-  public TajoProtos.FetcherState getState() {
-    return state;
-  }
-
-  public int getMessageReceiveCount() {
-    return messageReceiveCount;
-  }
-
-  public List<FileChunk> get() throws IOException {
-    List<FileChunk> fileChunks = new ArrayList<>();
-    if (useLocalFile) {
-      startTime = System.currentTimeMillis();
-      finishTime = System.currentTimeMillis();
-      state = TajoProtos.FetcherState.FETCH_FINISHED;
-      fileChunks.add(fileChunk);
-      fileLen = fileChunk.getFile().length();
-      return fileChunks;
-    }
-
-    if (state == FetcherState.FETCH_INIT) {
-      ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer(fileChunk.getFile());
-      bootstrap.handler(initializer);
-    }
-
-    this.startTime = System.currentTimeMillis();
-    this.state = TajoProtos.FetcherState.FETCH_FETCHING;
-    ChannelFuture future = null;
-    try {
-      future = bootstrap.clone().connect(new InetSocketAddress(host, port))
-              .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
-
-      // Wait until the connection attempt succeeds or fails.
-      Channel channel = future.awaitUninterruptibly().channel();
-      if (!future.isSuccess()) {
-        state = TajoProtos.FetcherState.FETCH_FAILED;
-        throw new IOException(future.cause());
-      }
-
-      String query = uri.getPath()
-          + (uri.getRawQuery() != null ? "?" + uri.getRawQuery() : "");
-      // Prepare the HTTP request.
-      HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, query);
-      request.headers().set(HttpHeaders.Names.HOST, host);
-      request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
-      request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
-
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Status: " + getState() + ", URI:" + uri);
-      }
-      // Send the HTTP request.
-      channel.writeAndFlush(request);
-
-      // Wait for the server to close the connection. throw exception if failed
-      channel.closeFuture().syncUninterruptibly();
-
-      fileChunk.setLength(fileChunk.getFile().length());
-
-      long start = 0;
-      for (Long eachChunkLength : chunkLengths) {
-        if (eachChunkLength == 0) continue;
-        FileChunk chunk = new FileChunk(fileChunk.getFile(), start, eachChunkLength);
-        chunk.setEbId(fileChunk.getEbId());
-        chunk.setFromRemote(fileChunk.fromRemote());
-        fileChunks.add(chunk);
-        start += eachChunkLength;
-      }
-      return fileChunks;
-
-    } finally {
-      if(future != null && future.channel().isOpen()){
-        // Close the channel to exit.
-        future.channel().close().awaitUninterruptibly();
-      }
-
-      this.finishTime = System.currentTimeMillis();
-      long elapsedMills = finishTime - startTime;
-      String transferSpeed;
-      if(elapsedMills > 1000) {
-        long bytePerSec = (fileChunk.length() * 1000) / elapsedMills;
-        transferSpeed = FileUtils.byteCountToDisplaySize(bytePerSec);
-      } else {
-        transferSpeed = FileUtils.byteCountToDisplaySize(Math.max(fileChunk.length(), 0));
-      }
-
-      LOG.info(String.format("Fetcher :%d ms elapsed. %s/sec, len:%d, state:%s, URL:%s",
-          elapsedMills, transferSpeed, fileChunk.length(), getState(), uri));
-    }
-  }
-
-  public URI getURI() {
-    return this.uri;
-  }
-
-  class HttpClientHandler extends ChannelInboundHandlerAdapter {
-    private final File file;
-    private RandomAccessFile raf;
-    private FileChannel fc;
-    private long length = -1;
-
-    public HttpClientHandler(File file) throws FileNotFoundException {
-      this.file = file;
-      this.raf = new RandomAccessFile(file, "rw");
-      this.fc = raf.getChannel();
-    }
-
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg)
-        throws Exception {
-
-      messageReceiveCount++;
-      if (msg instanceof HttpResponse) {
-        try {
-          HttpResponse response = (HttpResponse) msg;
-
-          StringBuilder sb = new StringBuilder();
-          if (LOG.isDebugEnabled()) {
-            sb.append("STATUS: ").append(response.getStatus()).append(", VERSION: ")
-                .append(response.getProtocolVersion()).append(", HEADER: ");
-          }
-          if (!response.headers().names().isEmpty()) {
-            for (String name : response.headers().names()) {
-              for (String value : response.headers().getAll(name)) {
-                if (LOG.isDebugEnabled()) {
-                  sb.append(name).append(" = ").append(value);
-                }
-                if (this.length == -1 && name.equals("Content-Length")) {
-                  this.length = Long.parseLong(value);
-                }
-              }
-            }
-            if (response.headers().contains(TajoPullServerService.CHUNK_LENGTH_HEADER_NAME)) {
-              String stringOffset = response.headers().get(TajoPullServerService.CHUNK_LENGTH_HEADER_NAME);
-
-              for (String eachSplit : stringOffset.split(",")) {
-                chunkLengths.add(Long.parseLong(eachSplit));
-              }
-            }
-          }
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(sb.toString());
-          }
-
-          if (response.getStatus().code() == HttpResponseStatus.NO_CONTENT.code()) {
-            LOG.warn("There are no data corresponding to the request");
-            length = 0;
-            return;
-          } else if (response.getStatus().code() != HttpResponseStatus.OK.code()) {
-            LOG.error(response.getStatus().reasonPhrase());
-            state = TajoProtos.FetcherState.FETCH_FAILED;
-            return;
-          }
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-        } finally {
-          ReferenceCountUtil.release(msg);
-        }
-      }
-
-      if (msg instanceof HttpContent) {
-        try {
-          HttpContent httpContent = (HttpContent) msg;
-          ByteBuf content = httpContent.content();
-          if (content.isReadable()) {
-            content.readBytes(fc, content.readableBytes());
-          }
-
-          if (msg instanceof LastHttpContent) {
-            if (raf != null) {
-              fileLen = file.length();
-            }
-
-            finishTime = System.currentTimeMillis();
-            if (state != TajoProtos.FetcherState.FETCH_FAILED) {
-              state = TajoProtos.FetcherState.FETCH_FINISHED;
-            }
-
-            IOUtils.cleanup(LOG, fc, raf);
-          }
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-        } finally {
-          ReferenceCountUtil.release(msg);
-        }
-      }
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
-        throws Exception {
-      if (cause instanceof ReadTimeoutException) {
-        LOG.warn(cause.getMessage(), cause);
-      } else {
-        LOG.error("Fetch failed :", cause);
-      }
-
-      // this fetching will be retry
-      IOUtils.cleanup(LOG, fc, raf);
-      finishTime = System.currentTimeMillis();
-      state = TajoProtos.FetcherState.FETCH_FAILED;
-      ctx.close();
-    }
-
-    @Override
-    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
-      if(getState() != TajoProtos.FetcherState.FETCH_FINISHED){
-        //channel is closed, but cannot complete fetcher
-        finishTime = System.currentTimeMillis();
-        LOG.error("Channel closed by peer: " + ctx.channel());
-        state = TajoProtos.FetcherState.FETCH_FAILED;
-      }
-      IOUtils.cleanup(LOG, fc, raf);
-      
-      super.channelUnregistered(ctx);
-    }
-  }
-
-  class HttpClientChannelInitializer extends ChannelInitializer<Channel> {
-    private final File file;
-
-    public HttpClientChannelInitializer(File file) {
-      this.file = file;
-    }
-
-    @Override
-    protected void initChannel(Channel channel) throws Exception {
-      ChannelPipeline pipeline = channel.pipeline();
-
-      int maxChunkSize = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE);
-      int readTimeout = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT);
-
-      pipeline.addLast("codec", new HttpClientCodec(4096, 8192, maxChunkSize));
-      pipeline.addLast("inflater", new HttpContentDecompressor());
-      pipeline.addLast("timeout", new ReadTimeoutHandler(readTimeout, TimeUnit.SECONDS));
-      pipeline.addLast("handler", new HttpClientHandler(file));
-    }
-  }
-}