You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/07/02 16:16:16 UTC

[22/51] [partial] TAJO-22: The package prefix should be org.apache.tajo. (DaeMyung Kang via hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
new file mode 100644
index 0000000..4934633
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tajo.QueryUnitAttemptId;
+
+public class TaskAttemptAssignedEvent extends TaskAttemptEvent {
+  private final ContainerId cId;
+  private final String hostName;
+  private final int pullServerPort;
+
+  public TaskAttemptAssignedEvent(QueryUnitAttemptId id, ContainerId cId,
+                                  String hostname, int pullServerPort) {
+    super(id, TaskAttemptEventType.TA_ASSIGNED);
+    this.cId = cId;
+    this.hostName = hostname;
+    this.pullServerPort = pullServerPort;
+  }
+
+  public ContainerId getContainerId() {
+    return cId;
+  }
+
+  public String getHostName() {
+    return hostName;
+  }
+
+  public int getPullServerPort() {
+    return pullServerPort;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java
new file mode 100644
index 0000000..f2df144
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.QueryUnitAttemptId;
+
+public class TaskAttemptEvent extends AbstractEvent<TaskAttemptEventType> {
+  private final QueryUnitAttemptId id;
+
+  public TaskAttemptEvent(QueryUnitAttemptId id,
+                          TaskAttemptEventType taskAttemptEventType) {
+    super(taskAttemptEventType);
+    this.id = id;
+  }
+
+  public QueryUnitAttemptId getTaskAttemptId() {
+    return this.id;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
new file mode 100644
index 0000000..d9d2f13
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+/**
+ * Event types handled by TaskAttempt.
+ */
+public enum TaskAttemptEventType {
+
+  //Producer:Task
+  TA_SCHEDULE,
+  TA_RESCHEDULE,
+
+  //Producer:Client, Task
+  TA_KILL,
+
+  //Producer:Scheduler
+  TA_ASSIGNED,
+
+  //Producer:Scheduler
+  TA_LAUNCHED,
+
+  //Producer:TaskAttemptListener
+  TA_DIAGNOSTICS_UPDATE,
+  TA_COMMIT_PENDING,
+  TA_DONE,
+  TA_FATAL_ERROR,
+  TA_UPDATE,
+  TA_TIMED_OUT,
+
+  //Producer:TaskCleaner
+  TA_CLEANUP_DONE,
+
+  //Producer:Job
+  TA_TOO_MANY_FETCH_FAILURE,
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
new file mode 100644
index 0000000..6409b43
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.engine.MasterWorkerProtos.TaskStatusProto;
+
+public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent {
+  private final TaskStatusProto status;
+
+  public TaskAttemptStatusUpdateEvent(final QueryUnitAttemptId id,
+                                      TaskStatusProto status) {
+    super(id, TaskAttemptEventType.TA_UPDATE);
+    this.status = status;
+  }
+
+  public TaskStatusProto getStatus() {
+    return status;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
new file mode 100644
index 0000000..b36d69c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.engine.MasterWorkerProtos.TaskCompletionReport;
+
+public class TaskCompletionEvent extends TaskAttemptEvent {
+  private TaskCompletionReport report;
+
+  public TaskCompletionEvent(TaskCompletionReport report) {
+    super(new QueryUnitAttemptId(report.getId()), TaskAttemptEventType.TA_DONE);
+    this.report = report;
+  }
+
+  public TaskCompletionReport getReport() {
+    return report;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskEvent.java
new file mode 100644
index 0000000..234491b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.QueryUnitId;
+
+public class TaskEvent extends AbstractEvent<TaskEventType> {
+  private final QueryUnitId id;
+
+  public TaskEvent(QueryUnitId id, TaskEventType taskEventType) {
+    super(taskEventType);
+    this.id = id;
+  }
+
+  public QueryUnitId getTaskId() {
+    return id;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskEventType.java
new file mode 100644
index 0000000..9448863
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskEventType.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+/**
+ * Event types handled by Task.
+ */
+public enum TaskEventType {
+
+  //Producer:Client, SubQuery
+  T_KILL,
+
+  //Producer:SubQuery
+  T_SCHEDULE,
+
+  //Producer:TaskAttempt
+  T_ATTEMPT_LAUNCHED,
+  T_ATTEMPT_COMMIT_PENDING,
+  T_ATTEMPT_FAILED,
+  T_ATTEMPT_SUCCEEDED,
+  T_ATTEMPT_KILLED
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
new file mode 100644
index 0000000..3d1c78d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.engine.MasterWorkerProtos.TaskFatalErrorReport;
+
+public class TaskFatalErrorEvent extends TaskAttemptEvent {
+  private TaskFatalErrorReport report;
+  public TaskFatalErrorEvent(TaskFatalErrorReport report) {
+    super(new QueryUnitAttemptId(report.getId()),
+        TaskAttemptEventType.TA_FATAL_ERROR);
+    this.report = report;
+  }
+
+  public String errorMessage() {
+    return report.getErrorMessage();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
new file mode 100644
index 0000000..25a8a14
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import com.google.protobuf.RpcCallback;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
+import org.apache.tajo.master.event.TaskRequestEvent.TaskRequestEventType;
+
+public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> {
+
+  public enum TaskRequestEventType {
+    TASK_REQ
+  }
+
+  private final ContainerId workerId;
+  private final RpcCallback<QueryUnitRequestProto> callback;
+
+  public TaskRequestEvent(ContainerId workerId,
+                          RpcCallback<QueryUnitRequestProto> callback) {
+    super(TaskRequestEventType.TASK_REQ);
+    this.workerId = workerId;
+    this.callback = callback;
+  }
+
+  public ContainerId getContainerId() {
+    return this.workerId;
+  }
+
+  public RpcCallback<QueryUnitRequestProto> getCallback() {
+    return this.callback;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
new file mode 100644
index 0000000..03b2aba
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.QueryUnitAttemptId;
+
+import java.util.Arrays;
+
+public class TaskScheduleEvent extends TaskSchedulerEvent {
+  private final QueryUnitAttemptId attemptId;
+  private final boolean isLeafQuery;
+  private final String[] hosts;
+  private final String[] racks;
+
+  public TaskScheduleEvent(final QueryUnitAttemptId attemptId,
+                           final EventType eventType, boolean isLeafQuery,
+                           final String[] hosts,
+                           final String[] racks) {
+    super(eventType, attemptId.getSubQueryId());
+    this.attemptId = attemptId;
+    this.isLeafQuery = isLeafQuery;
+    this.hosts = hosts;
+    this.racks = racks;
+  }
+
+  public QueryUnitAttemptId getAttemptId() {
+    return this.attemptId;
+  }
+
+  public boolean isLeafQuery() {
+    return this.isLeafQuery;
+  }
+
+  public String [] getHosts() {
+    return this.hosts;
+  }
+
+  public String [] getRacks() {
+    return this.racks;
+  }
+
+  @Override
+  public String toString() {
+    return "TaskScheduleEvent{" +
+        "attemptId=" + attemptId +
+        ", isLeafQuery=" + isLeafQuery +
+        ", hosts=" + (hosts == null ? null : Arrays.asList(hosts)) +
+        ", racks=" + (racks == null ? null : Arrays.asList(racks)) +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
new file mode 100644
index 0000000..d73bb87
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.SubQueryId;
+import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+
+public class TaskSchedulerEvent extends AbstractEvent<EventType> {
+  public enum EventType {
+    T_SCHEDULE,
+    T_SUBQUERY_COMPLETED
+  }
+
+  private final SubQueryId subQueryId;
+
+  public TaskSchedulerEvent(EventType eventType, SubQueryId subQueryId) {
+    super(eventType);
+    this.subQueryId = subQueryId;
+  }
+
+  public SubQueryId getSubQueryId() {
+    return this.subQueryId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java
new file mode 100644
index 0000000..28654f0
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.QueryUnitAttemptId;
+
+public class TaskTAttemptEvent extends TaskEvent {
+  private final QueryUnitAttemptId attemptId;
+  public TaskTAttemptEvent(QueryUnitAttemptId attemptId,
+                           TaskEventType eventType) {
+    super(attemptId.getQueryUnitId(), eventType);
+    this.attemptId = attemptId;
+  }
+
+  public QueryUnitAttemptId getTaskAttemptId() {
+    return attemptId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java
new file mode 100644
index 0000000..c615532
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.AMRMClientImpl;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.tajo.SubQueryId;
+import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.master.QueryMaster.QueryContext;
+import org.apache.tajo.master.SubQueryState;
+import org.apache.tajo.master.event.ContainerAllocationEvent;
+import org.apache.tajo.master.event.ContainerAllocatorEventType;
+import org.apache.tajo.master.event.SubQueryContainerAllocationEvent;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class RMContainerAllocator extends AMRMClientImpl
+    implements EventHandler<ContainerAllocationEvent> {
+
+  /** Class Logger */
+  private static final Log LOG = LogFactory.getLog(RMContainerAllocator.
+      class.getName());
+
+  private QueryContext context;
+  private final EventHandler eventHandler;
+
+  public RMContainerAllocator(QueryContext context) {
+    super(context.getApplicationAttemptId());
+    this.context = context;
+    this.eventHandler = context.getDispatcher().getEventHandler();
+  }
+
+  public void init(Configuration conf) {
+    super.init(conf);
+  }
+
+  private static final int WAIT_INTERVAL_AVAILABLE_NODES = 500; // 0.5 second
+  public void start() {
+    super.start();
+
+    RegisterApplicationMasterResponse response;
+    try {
+      response = registerApplicationMaster("locahost", 10080, "http://localhost:1234");
+      context.setMaxContainerCapability(response.getMaximumResourceCapability().getMemory());
+      context.setMinContainerCapability(response.getMinimumResourceCapability().getMemory());
+
+      // If the number of cluster nodes is ZERO, it waits for available nodes.
+      AllocateResponse allocateResponse = allocate(0.0f);
+      while(allocateResponse.getNumClusterNodes() < 1) {
+        try {
+          Thread.sleep(WAIT_INTERVAL_AVAILABLE_NODES);
+          LOG.info("Waiting for Available Cluster Nodes");
+          allocateResponse = allocate(0);
+        } catch (InterruptedException e) {
+          LOG.error(e);
+        }
+      }
+      context.setNumClusterNodes(allocateResponse.getNumClusterNodes());
+    } catch (YarnRemoteException e) {
+      LOG.error(e);
+    }
+
+    startAllocatorThread();
+  }
+
+  protected Thread allocatorThread;
+  private final AtomicBoolean stopped = new AtomicBoolean(false);
+  private int rmPollInterval = 1000;//millis
+  protected void startAllocatorThread() {
+    allocatorThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+          try {
+            try {
+              heartbeat();
+            } catch (YarnException e) {
+              LOG.error("Error communicating with RM: " + e.getMessage() , e);
+              return;
+            } catch (Exception e) {
+              LOG.error("ERROR IN CONTACTING RM. ", e);
+              // TODO: for other exceptions
+            }
+            Thread.sleep(rmPollInterval);
+          } catch (InterruptedException e) {
+            if (!stopped.get()) {
+              LOG.warn("Allocated thread interrupted. Returning.");
+            }
+            return;
+          }
+        }
+      }
+    });
+    allocatorThread.setName("RMContainerAllocator");
+    allocatorThread.start();
+  }
+
+  public void stop() {
+    stopped.set(true);
+    super.stop();
+    FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
+    QueryState state = context.getQuery().getState();
+    if (state == QueryState.QUERY_SUCCEEDED) {
+      finishState = FinalApplicationStatus.SUCCEEDED;
+    } else if (state == QueryState.QUERY_KILLED
+        || (state == QueryState.QUERY_RUNNING)) {
+      finishState = FinalApplicationStatus.KILLED;
+    } else if (state == QueryState.QUERY_FAILED
+        || state == QueryState.QUERY_ERROR) {
+      finishState = FinalApplicationStatus.FAILED;
+    }
+
+    try {
+      unregisterApplicationMaster(finishState, "", "http://localhost:1234");
+    } catch (YarnRemoteException e) {
+      LOG.error(e);
+    }
+  }
+
+  private final Map<Priority, SubQueryId> subQueryMap =
+      new HashMap<Priority, SubQueryId>();
+
+  public void heartbeat() throws Exception {
+    AllocateResponse allocateResponse = allocate(context.getProgress());
+    AMResponse response = allocateResponse.getAMResponse();
+    List<Container> allocatedContainers = response.getAllocatedContainers();
+
+    LOG.info("Available Cluster Nodes: " + allocateResponse.getNumClusterNodes());
+    LOG.info("Available Resource: " + response.getAvailableResources());
+    LOG.info("Num of Allocated Containers: " + response.getAllocatedContainers().size());
+    if (response.getAllocatedContainers().size() > 0) {
+      LOG.info("================================================================");
+      for (Container container : response.getAllocatedContainers()) {
+        LOG.info("> Container Id: " + container.getId());
+        LOG.info("> Node Id: " + container.getNodeId());
+        LOG.info("> Resource (Mem): " + container.getResource().getMemory());
+        LOG.info("> State : " + container.getState());
+        LOG.info("> Priority: " + container.getPriority());
+      }
+      LOG.info("================================================================");
+    }
+
+    Map<SubQueryId, List<Container>> allocated = new HashMap<SubQueryId, List<Container>>();
+    if (allocatedContainers.size() > 0) {
+      for (Container container : allocatedContainers) {
+        SubQueryId subQueryId = subQueryMap.get(container.getPriority());
+        SubQueryState state = context.getSubQuery(subQueryId).getState();
+        if (!(isRunningState(state) && subQueryMap.containsKey(container.getPriority()))) {
+          releaseAssignedContainer(container.getId());
+          synchronized (subQueryMap) {
+            subQueryMap.remove(container.getPriority());
+          }
+        } else {
+          if (allocated.containsKey(subQueryId)) {
+            allocated.get(subQueryId).add(container);
+          } else {
+            allocated.put(subQueryId, Lists.newArrayList(container));
+          }
+        }
+      }
+
+      for (Entry<SubQueryId, List<Container>> entry : allocated.entrySet()) {
+        eventHandler.handle(new SubQueryContainerAllocationEvent(entry.getKey(), entry.getValue()));
+      }
+    }
+  }
+
+  private static boolean isRunningState(SubQueryState state) {
+    return state == SubQueryState.INIT || state == SubQueryState.NEW ||
+        state == SubQueryState.CONTAINER_ALLOCATED || state == SubQueryState.RUNNING;
+  }
+
+  @Override
+  public void handle(ContainerAllocationEvent event) {
+
+    if (event.getType() == ContainerAllocatorEventType.CONTAINER_REQ) {
+      LOG.info(event);
+      subQueryMap.put(event.getPriority(), event.getSubQueryId());
+      addContainerRequest(new ContainerRequest(event.getCapability(), null, null,
+          event.getPriority(), event.getRequiredNum()));
+
+    } else if (event.getType() == ContainerAllocatorEventType.CONTAINER_DEALLOCATE) {
+      LOG.info(event);
+    } else {
+      LOG.info(event);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/net/CachedDNSResolver.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/net/CachedDNSResolver.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/net/CachedDNSResolver.java
new file mode 100644
index 0000000..2a53c47
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/net/CachedDNSResolver.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.net;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CachedDNSResolver {
+  private static Map<String, String> hostNameToIPAddrMap
+      = new ConcurrentHashMap<String, String>();
+
+  private static CachedDNSResolver instance;
+
+  static {
+    instance = new CachedDNSResolver();
+  }
+
+  public static String resolve(String hostName) {
+
+    if (hostNameToIPAddrMap.containsKey(hostName)) {
+      return hostNameToIPAddrMap.get(hostName);
+    }
+
+    String ipAddress = null;
+    try {
+      ipAddress = InetAddress.getByName(hostName).getHostAddress();
+    } catch (UnknownHostException e) {
+      e.printStackTrace();
+    }
+    hostNameToIPAddrMap.put(hostName, ipAddress);
+
+    return ipAddress;
+  }
+
+  public static String [] resolve(String [] hostNames) {
+    if (hostNames == null) {
+      return null;
+    }
+
+    String [] resolved = new String[hostNames.length];
+    for (int i = 0; i < hostNames.length; i++) {
+      resolved[i] = resolve(hostNames[i]);
+    }
+    return resolved;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/ScheduleTaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/ScheduleTaskEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/ScheduleTaskEvent.java
new file mode 100644
index 0000000..e1e074d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/ScheduleTaskEvent.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.scheduler.event;
+
+import org.apache.tajo.QueryUnitAttemptId;
+
+public class ScheduleTaskEvent extends SchedulerEvent {
+  private final QueryUnitAttemptId attemptId;
+
+  public ScheduleTaskEvent(QueryUnitAttemptId id) {
+    super(SchedulerEventType.SCHEDULE);
+    this.attemptId = id;
+  }
+
+  public QueryUnitAttemptId getTaskAttemptId() {
+    return attemptId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEvent.java
new file mode 100644
index 0000000..664a2d6
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEvent.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.scheduler.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class SchedulerEvent extends AbstractEvent<SchedulerEventType> {
+
+  public SchedulerEvent(SchedulerEventType schedulerEventType) {
+    super(schedulerEventType);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEventType.java
new file mode 100644
index 0000000..2e49c47
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEventType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.scheduler.event;
+
+/**
+ * Event Type for Scheduler
+ */
+public enum SchedulerEventType {
+  SCHEDULE,
+  RESCHEDULE
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/GeoUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/GeoUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/GeoUtil.java
new file mode 100644
index 0000000..6028725
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/GeoUtil.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import com.maxmind.geoip.LookupService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+
+import java.io.IOException;
+
+public class GeoUtil {
+  private static final Log LOG = LogFactory.getLog(GeoUtil.class);
+  private static LookupService lookup;
+
+  static {
+    try {
+      TajoConf conf = new TajoConf();
+      lookup = new LookupService(conf.getVar(ConfVars.GEOIP_DATA),
+          LookupService.GEOIP_MEMORY_CACHE);
+    } catch (IOException e) {
+      LOG.error("Cannot open the geoip data", e);
+    }
+  }
+
+  public static String getCountryCode(String host) {
+    return lookup.getCountry(host).getCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
new file mode 100644
index 0000000..883725c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import com.google.gson.Gson;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.eval.ConstEval;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalNode.Type;
+import org.apache.tajo.engine.eval.EvalNodeVisitor;
+import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.json.GsonCreator;
+import org.apache.tajo.engine.planner.logical.IndexScanNode;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.storage.Fragment;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+
+public class IndexUtil {
+  public static String getIndexNameOfFrag(Fragment fragment, SortSpec[] keys) {
+    StringBuilder builder = new StringBuilder(); 
+    builder.append(fragment.getPath().getName() + "_");
+    builder.append(fragment.getStartOffset() + "_" + fragment.getLength() + "_");
+    for(int i = 0 ; i < keys.length ; i ++) {
+      builder.append(keys[i].getSortKey().getColumnName()+"_");
+    }
+    builder.append("_index");
+    return builder.toString();
+       
+  }
+  
+  public static String getIndexName (String indexName , SortSpec[] keys) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(indexName + "_");
+    for(int i = 0 ; i < keys.length ; i ++) {
+      builder.append(keys[i].getSortKey().getColumnName() + "_");
+    }
+    return builder.toString();
+  }
+  
+  public static IndexScanNode indexEval( ScanNode scanNode, 
+      Iterator<Entry<String, String>> iter ) {
+   
+    EvalNode qual = scanNode.getQual();
+    Gson gson = GsonCreator.getInstance();
+    
+    FieldAndValueFinder nodeFinder = new FieldAndValueFinder();
+    qual.preOrder(nodeFinder);
+    LinkedList<EvalNode> nodeList = nodeFinder.getNodeList();
+    
+    int maxSize = Integer.MIN_VALUE;
+    SortSpec[] maxIndex = null;
+    
+    String json;
+    while(iter.hasNext()) {
+      Entry<String , String> entry = iter.next();
+      json = entry.getValue();
+      SortSpec[] sortKey = gson.fromJson(json, SortSpec[].class);
+      if(sortKey.length > nodeList.size()) {
+        /* If the number of the sort key is greater than where condition, 
+         * this index cannot be used
+         * */
+        continue; 
+      } else {
+        boolean[] equal = new boolean[sortKey.length];
+        for(int i = 0 ; i < sortKey.length ; i ++) {
+          for(int j = 0 ; j < nodeList.size() ; j ++) {
+            Column col = ((FieldEval)(nodeList.get(j).getLeftExpr())).getColumnRef();
+            if(col.equals(sortKey[i].getSortKey())) {
+              equal[i] = true;
+            }
+          }
+        }
+        boolean chk = true;
+        for(int i = 0 ; i < equal.length ; i ++) {
+          chk = chk && equal[i];
+        }
+        if(chk) {
+          if(maxSize < sortKey.length) {
+            maxSize = sortKey.length;
+            maxIndex = sortKey;
+          }
+        }
+      }
+    }
+    if(maxIndex == null) {
+      return null;
+    } else {
+      Schema keySchema = new Schema();
+      for(int i = 0 ; i < maxIndex.length ; i ++ ) {
+        keySchema.addColumn(maxIndex[i].getSortKey());
+      }
+      Datum[] datum = new Datum[nodeList.size()];
+      for(int i = 0 ; i < nodeList.size() ; i ++ ) {
+        datum[i] = ((ConstEval)(nodeList.get(i).getRightExpr())).getValue();
+      }
+      
+      return new IndexScanNode(scanNode, keySchema , datum , maxIndex);
+    }
+
+  }
+  
+  
+  private static class FieldAndValueFinder implements EvalNodeVisitor {
+    private LinkedList<EvalNode> nodeList = new LinkedList<EvalNode>();
+    
+    public LinkedList<EvalNode> getNodeList () {
+      return this.nodeList;
+    }
+    
+    @Override
+    public void visit(EvalNode node) {
+      switch(node.getType()) {
+      case AND:
+        break;
+      case EQUAL:
+        if( node.getLeftExpr().getType() == Type.FIELD 
+          && node.getRightExpr().getType() == Type.CONST ) {
+          nodeList.add(node);
+        }
+        break;
+      case IS:
+        if( node.getLeftExpr().getType() == Type.FIELD 
+          && node.getRightExpr().getType() == Type.CONST) {
+          nodeList.add(node);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/HttpServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/HttpServer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/HttpServer.java
new file mode 100644
index 0000000..91fdb29
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/HttpServer.java
@@ -0,0 +1,445 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.webapp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.mortbay.jetty.Connector;
+import org.mortbay.jetty.Handler;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.handler.ContextHandlerCollection;
+import org.mortbay.jetty.nio.SelectChannelConnector;
+import org.mortbay.jetty.servlet.*;
+import org.mortbay.jetty.webapp.WebAppContext;
+import org.mortbay.thread.QueuedThreadPool;
+import org.mortbay.util.MultiException;
+
+import javax.servlet.http.HttpServlet;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.BindException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is borrowed from Hadoop and is simplified to our objective.
+ */
+public class HttpServer {
+  private static final Log LOG = LogFactory.getLog(HttpServer.class);
+
+  protected final Server webServer;
+  protected final Connector listener;
+  protected final WebAppContext webAppContext;
+  protected final boolean findPort;
+  protected final Map<Context, Boolean> defaultContexts = 
+      new HashMap<Context, Boolean>();
+  protected final List<String> filterNames = new ArrayList<String>();
+  private static final int MAX_RETRIES = 10;
+  private final boolean listenerStartedExternally;
+  static final String STATE_DESCRIPTION_ALIVE = " - alive";
+  static final String STATE_DESCRIPTION_NOT_LIVE = " - not live";
+
+  public HttpServer(String name, String bindAddress, int port,
+      boolean findPort, Connector connector, Configuration conf,
+      String[] pathSpecs) throws IOException {
+    this.webServer = new Server();
+    this.findPort = findPort;
+
+    if (connector == null) {
+      listenerStartedExternally = false;
+      listener = createBaseListener(conf);
+      listener.setHost(bindAddress);
+      listener.setPort(port);
+    } else {
+      listenerStartedExternally = true;
+      listener = connector;
+    }
+
+    webServer.addConnector(listener);
+
+    int maxThreads = conf.getInt("tajo.http.maxthreads", -1);
+    // If HTTP_MAX_THREADS is not configured, QueueThreadPool() will use the
+    // default value (currently 250).
+    QueuedThreadPool threadPool = maxThreads == -1 ? new QueuedThreadPool()
+        : new QueuedThreadPool(maxThreads);
+    webServer.setThreadPool(threadPool);
+
+    final String appDir = getWebAppsPath(name);
+    ContextHandlerCollection contexts = new ContextHandlerCollection();
+    webServer.setHandler(contexts);
+
+    webAppContext = new WebAppContext();
+    webAppContext.setDisplayName(name);
+    webAppContext.setContextPath("/");
+    webAppContext.setWar(appDir + "/" + name);
+    webServer.addHandler(webAppContext);
+
+    addDefaultApps(contexts, appDir, conf);
+  }
+
+  /**
+   * Create a required listener for the Jetty instance listening on the port
+   * provided. This wrapper and all subclasses must create at least one
+   * listener.
+   */
+  public Connector createBaseListener(Configuration conf) throws IOException {
+    return HttpServer.createDefaultChannelConnector();
+  }
+
+  static Connector createDefaultChannelConnector() {
+    SelectChannelConnector ret = new SelectChannelConnector();
+    ret.setLowResourceMaxIdleTime(10000);
+    ret.setAcceptQueueSize(128);
+    ret.setResolveNames(false);
+    ret.setUseDirectBuffers(false);
+    return ret;
+  }
+
+  /**
+   * Add default apps.
+   * 
+   * @param appDir
+   *          The application directory
+   * @throws IOException
+   */
+  protected void addDefaultApps(ContextHandlerCollection parent,
+      final String appDir, Configuration conf) throws IOException {
+    // set up the context for "/logs/" if "hadoop.log.dir" property is defined.
+    String logDir = System.getProperty("tajo.log.dir");
+    if (logDir != null) {
+      Context logContext = new Context(parent, "/logs");
+      logContext.setResourceBase(logDir);
+      //logContext.addServlet(AdminAuthorizedServlet.class, "/*");
+      logContext.setDisplayName("logs");
+      defaultContexts.put(logContext, true);
+    }
+    // set up the context for "/static/*"
+    Context staticContext = new Context(parent, "/static");
+    staticContext.setResourceBase(appDir + "/static");
+    staticContext.addServlet(DefaultServlet.class, "/*");
+    staticContext.setDisplayName("static");
+    defaultContexts.put(staticContext, true);
+  }
+  
+  public void addContext(Context ctxt, boolean isFiltered)
+      throws IOException {
+    webServer.addHandler(ctxt);
+    defaultContexts.put(ctxt, isFiltered);
+  }
+  
+  /**
+   * Add a context 
+   * @param pathSpec The path spec for the context
+   * @param dir The directory containing the context
+   * @param isFiltered if true, the servlet is added to the filter path mapping 
+   * @throws IOException
+   */
+  protected void addContext(String pathSpec, String dir, boolean isFiltered) throws IOException {
+    if (0 == webServer.getHandlers().length) {
+      throw new RuntimeException("Couldn't find handler");
+    }
+    WebAppContext webAppCtx = new WebAppContext();
+    webAppCtx.setContextPath(pathSpec);
+    webAppCtx.setWar(dir);
+    addContext(webAppCtx, true);
+  }
+  
+  /**
+   * Set a value in the webapp context. These values are available to the jsp
+   * pages as "application.getAttribute(name)".
+   * @param name The name of the attribute
+   * @param value The value of the attribute
+   */
+  public void setAttribute(String name, Object value) {
+    webAppContext.setAttribute(name, value);
+  }
+  
+  /**
+   * Add a servlet in the server.
+   * @param name The name of the servlet (can be passed as null)
+   * @param pathSpec The path spec for the servlet
+   * @param clazz The servlet class
+   */
+  public void addServlet(String name, String pathSpec,
+      Class<? extends HttpServlet> clazz) {
+    addInternalServlet(name, pathSpec, clazz, false);
+    addFilterPathMapping(pathSpec, webAppContext);
+  }
+  
+  /**
+   * Add an internal servlet in the server, specifying whether or not to
+   * protect with Kerberos authentication. 
+   * Note: This method is to be used for adding servlets that facilitate
+   * internal communication and not for user facing functionality. For
+   * servlets added using this method, filters (except internal Kerberized
+   * filters) are not enabled. 
+   * 
+   * @param name The name of the servlet (can be passed as null)
+   * @param pathSpec The path spec for the servlet
+   * @param clazz The servlet class
+   */
+  public void addInternalServlet(String name, String pathSpec, 
+      Class<? extends HttpServlet> clazz, boolean requireAuth) {
+    ServletHolder holder = new ServletHolder(clazz);
+    if (name != null) {
+      holder.setName(name);
+    }
+    webAppContext.addServlet(holder, pathSpec);
+    
+    if(requireAuth && UserGroupInformation.isSecurityEnabled()) {
+       LOG.info("Adding Kerberos filter to " + name);
+       ServletHandler handler = webAppContext.getServletHandler();
+       FilterMapping fmap = new FilterMapping();
+       fmap.setPathSpec(pathSpec);
+       fmap.setFilterName("krb5Filter");
+       fmap.setDispatches(Handler.ALL);
+       handler.addFilterMapping(fmap);
+    }
+  }
+  
+  /**
+   * Add the path spec to the filter path mapping.
+   * @param pathSpec The path spec
+   * @param webAppCtx The WebApplicationContext to add to
+   */
+  protected void addFilterPathMapping(String pathSpec,
+      Context webAppCtx) {
+    ServletHandler handler = webAppCtx.getServletHandler();
+    for(String name : filterNames) {
+      FilterMapping fmap = new FilterMapping();
+      fmap.setPathSpec(pathSpec);
+      fmap.setFilterName(name);
+      fmap.setDispatches(Handler.ALL);
+      handler.addFilterMapping(fmap);
+    }
+  }
+  
+  protected String getWebAppsPath(String appName) throws FileNotFoundException {
+    URL url = getClass().getClassLoader().getResource("webapps/" + appName);
+    if (url == null) {
+      throw new FileNotFoundException("webapps/" + appName
+          + " not found in CLASSPATH");
+    }
+    String urlString = url.toString();
+    return urlString.substring(0, urlString.lastIndexOf('/'));
+  }
+  
+  /**
+   * Get the value in the webapp context.
+   * @param name The name of the attribute
+   * @return The value of the attribute
+   */
+  public Object getAttribute(String name) {
+    return webAppContext.getAttribute(name);
+  }  
+  
+  /**
+   * Get the port that the server is on
+   * @return the port
+   */
+  public int getPort() {
+    return webServer.getConnectors()[0].getLocalPort();
+  }
+
+  /**
+   * Set the min, max number of worker threads (simultaneous connections).
+   */
+  public void setThreads(int min, int max) {
+    QueuedThreadPool pool = (QueuedThreadPool) webServer.getThreadPool() ;
+    pool.setMinThreads(min);
+    pool.setMaxThreads(max);
+  }
+
+  /**
+   * Start the server. Does not wait for the server to start.
+   */
+  public void start() throws IOException {
+    try {
+      if (listenerStartedExternally) { // Expect that listener was started
+                                       // securely
+        if (listener.getLocalPort() == -1) // ... and verify
+          throw new Exception("Exepected webserver's listener to be started "
+              + "previously but wasn't");
+        // And skip all the port rolling issues.
+        webServer.start();
+      } else {
+        int port;
+        int oriPort = listener.getPort(); // The original requested port
+        while (true) {
+          try {
+            port = webServer.getConnectors()[0].getLocalPort();
+            LOG.debug("Port returned by webServer.getConnectors()[0]."
+                + "getLocalPort() before open() is " + port
+                + ". Opening the listener on " + oriPort);
+            listener.open();
+            port = listener.getLocalPort();
+            LOG.debug("listener.getLocalPort() returned "
+                + listener.getLocalPort()
+                + " webServer.getConnectors()[0].getLocalPort() returned "
+                + webServer.getConnectors()[0].getLocalPort());
+            // Workaround to handle the problem reported in HADOOP-4744
+            if (port < 0) {
+              Thread.sleep(100);
+              int numRetries = 1;
+              while (port < 0) {
+                LOG.warn("listener.getLocalPort returned " + port);
+                if (numRetries++ > MAX_RETRIES) {
+                  throw new Exception(" listener.getLocalPort is returning "
+                      + "less than 0 even after " + numRetries + " resets");
+                }
+                for (int i = 0; i < 2; i++) {
+                  LOG.info("Retrying listener.getLocalPort()");
+                  port = listener.getLocalPort();
+                  if (port > 0) {
+                    break;
+                  }
+                  Thread.sleep(200);
+                }
+                if (port > 0) {
+                  break;
+                }
+                LOG.info("Bouncing the listener");
+                listener.close();
+                Thread.sleep(1000);
+                listener.setPort(oriPort == 0 ? 0 : (oriPort += 1));
+                listener.open();
+                Thread.sleep(100);
+                port = listener.getLocalPort();
+              }
+            } // Workaround end
+            LOG.info("Jetty bound to port " + port);
+            webServer.start();
+            break;
+          } catch (IOException ex) {
+            // if this is a bind exception,
+            // then try the next port number.
+            if (ex instanceof BindException) {
+              if (!findPort) {
+                BindException be = new BindException("Port in use: "
+                    + listener.getHost() + ":" + listener.getPort());
+                be.initCause(ex);
+                throw be;
+              }
+            } else {
+              LOG.info("HttpServer.start() threw a non Bind IOException");
+              throw ex;
+            }
+          } catch (MultiException ex) {
+            LOG.info("HttpServer.start() threw a MultiException");
+            throw ex;
+          }
+          listener.setPort((oriPort += 1));
+        }
+      }
+      // Make sure there is no handler failures.
+      Handler[] handlers = webServer.getHandlers();
+      for (int i = 0; i < handlers.length; i++) {
+        if (handlers[i].isFailed()) {
+          throw new IOException(
+              "Problem in starting http server. Server handlers failed");
+        }
+      }
+    } catch (IOException e) {
+      throw e;
+    } catch (InterruptedException e) {
+      throw (IOException) new InterruptedIOException(
+          "Interrupted while starting HTTP server").initCause(e);
+    } catch (Exception e) {
+      throw new IOException("Problem starting http server", e);
+    }
+  }
+
+  /**
+   * stop the server
+   */
+  public void stop() throws Exception {
+    MultiException exception = null;
+    try {
+      listener.close();
+    } catch (Exception e) {
+      LOG.error(
+          "Error while stopping listener for webapp"
+              + webAppContext.getDisplayName(), e);
+      exception = addMultiException(exception, e);
+    }
+
+    try {
+      // clear & stop webAppContext attributes to avoid memory leaks.
+      webAppContext.clearAttributes();
+      webAppContext.stop();
+    } catch (Exception e) {
+      LOG.error("Error while stopping web app context for webapp "
+          + webAppContext.getDisplayName(), e);
+      exception = addMultiException(exception, e);
+    }
+    try {
+      webServer.stop();
+    } catch (Exception e) {
+      LOG.error(
+          "Error while stopping web server for webapp "
+              + webAppContext.getDisplayName(), e);
+      exception = addMultiException(exception, e);
+    }
+
+    if (exception != null) {
+      exception.ifExceptionThrow();
+    }
+
+  }
+
+  private MultiException addMultiException(MultiException exception, Exception e) {
+    if (exception == null) {
+      exception = new MultiException();
+    }
+    exception.add(e);
+    return exception;
+  }
+
+  public void join() throws InterruptedException {
+    webServer.join();
+  }
+
+  /**
+   * Test for the availability of the web server
+   * 
+   * @return true if the web server is started, false otherwise
+   */
+  public boolean isAlive() {
+    return webServer != null && webServer.isStarted();
+  }
+
+  /**
+   * Return the host and port of the HttpServer, if live
+   * 
+   * @return the classname and any HTTP URL
+   */
+  @Override
+  public String toString() {
+    return listener != null ? ("HttpServer at http://" + listener.getHost()
+        + ":" + listener.getLocalPort() + "/" + (isAlive() ? STATE_DESCRIPTION_ALIVE
+        : STATE_DESCRIPTION_NOT_LIVE))
+        : "Inactive HttpServer";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
new file mode 100644
index 0000000..f185cc6
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.webapp;
+
+import org.apache.hadoop.conf.Configuration;
+import org.mortbay.jetty.Connector;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.master.TajoMaster;
+
+import java.io.IOException;
+
+public class StaticHttpServer extends HttpServer {
+  private static StaticHttpServer instance = null;
+  private TajoMaster master = null;
+  
+  private StaticHttpServer(TajoMaster master , String name, String bindAddress, int port,
+      boolean findPort, Connector connector, Configuration conf,
+      String[] pathSpecs) throws IOException {
+    super( name, bindAddress, port, findPort, connector, conf, pathSpecs);
+    this.master = master;
+  }
+  public static StaticHttpServer getInstance() {
+    return instance;
+  }
+  public static StaticHttpServer getInstance( TajoMaster master, String name,
+      String bindAddress, int port, boolean findPort, Connector connector,
+      TajoConf conf,
+      String[] pathSpecs) throws IOException {
+    String addr = bindAddress;
+    if(instance == null) {
+      if(bindAddress == null || bindAddress.compareTo("") == 0) {
+        addr = conf.getVar(ConfVars.TASKRUNNER_LISTENER_ADDRESS).split(":")[0];
+      }
+      
+      instance = new StaticHttpServer(master, name, addr, port,
+          findPort, connector, conf, pathSpecs);
+      instance.setAttribute("tajo.master", master);
+      instance.setAttribute("tajo.master.addr", addr);
+      instance.setAttribute("tajo.master.conf", conf);
+      instance.setAttribute("tajo.master.starttime", System.currentTimeMillis());
+    }
+    return instance;
+  }
+  public TajoMaster getMaster() {
+    
+    return this.master;
+  }
+  public void set(String name, Object object) {
+    instance.setAttribute(name, object);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
new file mode 100644
index 0000000..4619fe4
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.*;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.Executors;
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+/**
+ * Fetcher fetches data from a given uri via HTTP protocol and stores them into
+ * a specific file. It aims at asynchronous and efficient data transmit.
+ */
+public class Fetcher {
+  private final static Log LOG = LogFactory.getLog(Fetcher.class);
+
+  private final URI uri;
+  private final File file;
+
+  private final String host;
+  private int port;
+
+  public Fetcher(URI uri, File file) {
+    this.uri = uri;
+    this.file = file;
+
+    String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
+    this.host = uri.getHost() == null ? "localhost" : uri.getHost();
+    this.port = uri.getPort();
+    if (port == -1) {
+      if (scheme.equalsIgnoreCase("http")) {
+        this.port = 80;
+      } else if (scheme.equalsIgnoreCase("https")) {
+        this.port = 443;
+      }
+    }
+  }
+
+  public File get() throws IOException {
+    ClientBootstrap bootstrap = new ClientBootstrap(
+        new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+            Executors.newCachedThreadPool()));
+    bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec
+    bootstrap.setOption("receiveBufferSize", 1048576); // set 1M
+    ChannelPipelineFactory factory = new HttpClientPipelineFactory(file);
+    bootstrap.setPipelineFactory(factory);
+
+    ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
+
+    // Wait until the connection attempt succeeds or fails.
+    Channel channel = future.awaitUninterruptibly().getChannel();
+    if (!future.isSuccess()) {
+      bootstrap.releaseExternalResources();
+      throw new IOException(future.getCause());
+    }
+
+    String query = uri.getPath()
+        + (uri.getRawQuery() != null ? "?" + uri.getRawQuery() : "");
+    // Prepare the HTTP request.
+    HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, query);
+    request.setHeader(HttpHeaders.Names.HOST, host);
+    LOG.info("Fetch: " + uri);
+    request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+    request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
+
+    // Send the HTTP request.
+    channel.write(request);
+
+    // Wait for the server to close the connection.
+    channel.getCloseFuture().awaitUninterruptibly();
+
+    // Shut down executor threads to exit.
+    bootstrap.releaseExternalResources();
+
+    return file;
+  }
+
+  public URI getURI() {
+    return this.uri;
+  }
+
+  public static class HttpClientHandler extends SimpleChannelUpstreamHandler {
+    private volatile boolean readingChunks;
+    private final File file;
+    private RandomAccessFile raf;
+    private FileChannel fc;
+    private long length = -1;
+
+    public HttpClientHandler(File file) throws FileNotFoundException {
+      this.file = file;
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        throws Exception {
+      if (!readingChunks) {
+        HttpResponse response = (HttpResponse) e.getMessage();
+
+        StringBuilder sb = new StringBuilder();
+        if (LOG.isDebugEnabled()) {
+          sb.append("STATUS: ").append(response.getStatus())
+              .append(", VERSION: ").append(response.getProtocolVersion())
+              .append(", HEADER: ");
+        }
+        if (!response.getHeaderNames().isEmpty()) {
+          for (String name : response.getHeaderNames()) {
+            for (String value : response.getHeaders(name)) {
+              if (LOG.isDebugEnabled()) {
+                sb.append(name).append(" = ").append(value);
+              }
+              if (this.length == -1 && name.equals("Content-Length")) {
+                this.length = Long.valueOf(value);
+              }
+            }
+          }
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sb.toString());
+        }
+
+        if (response.getStatus() == HttpResponseStatus.NO_CONTENT) {
+          LOG.info("There are no data corresponding to the request");
+          return;
+        }
+
+        this.raf = new RandomAccessFile(file, "rw");
+        this.fc = raf.getChannel();
+
+        if (response.isChunked()) {
+          readingChunks = true;
+        } else {
+          ChannelBuffer content = response.getContent();
+          if (content.readable()) {
+            fc.write(content.toByteBuffer());
+          }
+        }
+      } else {
+        HttpChunk chunk = (HttpChunk) e.getMessage();
+        if (chunk.isLast()) {
+          readingChunks = false;
+          long fileLength = fc.position();
+          fc.close();
+          raf.close();
+          if (fileLength == length) {
+            LOG.info("Data fetch is done (total received bytes: " + fileLength
+                + ")");
+          } else {
+            LOG.info("Data fetch is done, but cannot get all data "
+                + "(received/total: " + fileLength + "/" + length + ")");
+          }
+        } else {
+          fc.write(chunk.getContent().toByteBuffer());
+        }
+      }
+    }
+  }
+
+  public static class HttpClientPipelineFactory implements
+      ChannelPipelineFactory {
+    private final File file;
+
+    public HttpClientPipelineFactory(File file) {
+      this.file = file;
+    }
+
+    @Override
+    public ChannelPipeline getPipeline() throws Exception {
+      ChannelPipeline pipeline = pipeline();
+
+      pipeline.addLast("codec", new HttpClientCodec());
+      pipeline.addLast("inflater", new HttpContentDecompressor());
+      pipeline.addLast("handler", new HttpClientHandler(file));
+      return pipeline;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/InterDataRetriever.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
new file mode 100644
index 0000000..42ad875
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.worker.dataserver.FileAccessForbiddenException;
+import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
+import org.apache.tajo.worker.dataserver.retriever.FileChunk;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+@Deprecated
+public class InterDataRetriever implements DataRetriever {
+  private final Log LOG = LogFactory.getLog(InterDataRetriever.class);
+  private final Set<QueryUnitId> registered = Sets.newHashSet();
+  private final Map<String, String> map = Maps.newConcurrentMap();
+
+  public InterDataRetriever() {
+  }
+  
+  public void register(QueryUnitId id, String baseURI) {
+    synchronized (registered) {
+      if (!registered.contains(id)) {      
+        map.put(id.toString(), baseURI);
+        registered.add(id);      
+      }
+    } 
+  }
+  
+  public void unregister(QueryUnitId id) {
+    synchronized (registered) {
+      if (registered.contains(id)) {
+        map.remove(id.toString());
+        registered.remove(id);
+      }
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.tajo.worker.dataserver.retriever.DataRetriever#handle(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.handler.codec.http.HttpRequest)
+   */
+  @Override
+  public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
+      throws IOException {
+       
+    int start = request.getUri().indexOf('?');
+    if (start < 0) {
+      throw new IllegalArgumentException("Wrong request: " + request.getUri());
+    }
+    
+    String queryStr = request.getUri().substring(start + 1);
+    LOG.info("QUERY: " + queryStr);
+    String [] queries = queryStr.split("&");
+    
+    String qid = null;
+    String fn = null;
+    String [] kv;
+    for (String query : queries) {
+      kv = query.split("=");
+      if (kv[0].equals("qid")) {
+        qid = kv[1];
+      } else if (kv[0].equals("fn")) {
+        fn = kv[1];
+      }
+    }
+    
+    String baseDir = map.get(qid);
+    if (baseDir == null) {
+      throw new FileNotFoundException("No such qid: " + qid);
+    }
+
+    File file = new File(baseDir + "/" + fn);
+    if (file.isHidden() || !file.exists()) {
+      throw new FileNotFoundException("No such file: " + baseDir + "/" 
+          + file.getName());
+    }
+    if (!file.isFile()) {
+      throw new FileAccessForbiddenException("No such file: " 
+          + baseDir + "/" + file.getName()); 
+    }
+    
+    return new FileChunk[] {new FileChunk(file, 0, file.length())};
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java
new file mode 100644
index 0000000..36e7353
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.worker.dataserver.retriever.FileChunk;
+import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class PartitionRetrieverHandler implements RetrieverHandler {
+  private final String baseDir;
+
+  public PartitionRetrieverHandler(String baseDir) {
+    this.baseDir = baseDir;
+  }
+
+  @Override
+  public FileChunk get(Map<String, List<String>> kvs) throws IOException {
+    // nothing to verify the file because AdvancedDataRetriever checks
+    // its validity of the file.
+    File file = new File(baseDir + "/" + kvs.get("fn").get(0));
+
+    return new FileChunk(file, 0, file.length());
+  }
+}