You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/07/02 16:16:16 UTC
[22/51] [partial] TAJO-22: The package prefix should be
org.apache.tajo. (DaeMyung Kang via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
new file mode 100644
index 0000000..4934633
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tajo.QueryUnitAttemptId;
+
+public class TaskAttemptAssignedEvent extends TaskAttemptEvent {
+ private final ContainerId cId;
+ private final String hostName;
+ private final int pullServerPort;
+
+ public TaskAttemptAssignedEvent(QueryUnitAttemptId id, ContainerId cId,
+ String hostname, int pullServerPort) {
+ super(id, TaskAttemptEventType.TA_ASSIGNED);
+ this.cId = cId;
+ this.hostName = hostname;
+ this.pullServerPort = pullServerPort;
+ }
+
+ public ContainerId getContainerId() {
+ return cId;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public int getPullServerPort() {
+ return pullServerPort;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java
new file mode 100644
index 0000000..f2df144
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.QueryUnitAttemptId;
+
+public class TaskAttemptEvent extends AbstractEvent<TaskAttemptEventType> {
+ private final QueryUnitAttemptId id;
+
+ public TaskAttemptEvent(QueryUnitAttemptId id,
+ TaskAttemptEventType taskAttemptEventType) {
+ super(taskAttemptEventType);
+ this.id = id;
+ }
+
+ public QueryUnitAttemptId getTaskAttemptId() {
+ return this.id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
new file mode 100644
index 0000000..d9d2f13
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+/**
+ * Event types handled by TaskAttempt.
+ */
+public enum TaskAttemptEventType {
+
+ //Producer:Task
+ TA_SCHEDULE,
+ TA_RESCHEDULE,
+
+ //Producer:Client, Task
+ TA_KILL,
+
+ //Producer:Scheduler
+ TA_ASSIGNED,
+
+ //Producer:Scheduler
+ TA_LAUNCHED,
+
+ //Producer:TaskAttemptListener
+ TA_DIAGNOSTICS_UPDATE,
+ TA_COMMIT_PENDING,
+ TA_DONE,
+ TA_FATAL_ERROR,
+ TA_UPDATE,
+ TA_TIMED_OUT,
+
+ //Producer:TaskCleaner
+ TA_CLEANUP_DONE,
+
+ //Producer:Job
+ TA_TOO_MANY_FETCH_FAILURE,
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
new file mode 100644
index 0000000..6409b43
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.engine.MasterWorkerProtos.TaskStatusProto;
+
+public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent {
+ private final TaskStatusProto status;
+
+ public TaskAttemptStatusUpdateEvent(final QueryUnitAttemptId id,
+ TaskStatusProto status) {
+ super(id, TaskAttemptEventType.TA_UPDATE);
+ this.status = status;
+ }
+
+ public TaskStatusProto getStatus() {
+ return status;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
new file mode 100644
index 0000000..b36d69c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.engine.MasterWorkerProtos.TaskCompletionReport;
+
+public class TaskCompletionEvent extends TaskAttemptEvent {
+ private TaskCompletionReport report;
+
+ public TaskCompletionEvent(TaskCompletionReport report) {
+ super(new QueryUnitAttemptId(report.getId()), TaskAttemptEventType.TA_DONE);
+ this.report = report;
+ }
+
+ public TaskCompletionReport getReport() {
+ return report;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskEvent.java
new file mode 100644
index 0000000..234491b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.QueryUnitId;
+
+public class TaskEvent extends AbstractEvent<TaskEventType> {
+ private final QueryUnitId id;
+
+ public TaskEvent(QueryUnitId id, TaskEventType taskEventType) {
+ super(taskEventType);
+ this.id = id;
+ }
+
+ public QueryUnitId getTaskId() {
+ return id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskEventType.java
new file mode 100644
index 0000000..9448863
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskEventType.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+/**
+ * Event types handled by Task.
+ */
+public enum TaskEventType {
+
+ //Producer:Client, SubQuery
+ T_KILL,
+
+ //Producer:SubQuery
+ T_SCHEDULE,
+
+ //Producer:TaskAttempt
+ T_ATTEMPT_LAUNCHED,
+ T_ATTEMPT_COMMIT_PENDING,
+ T_ATTEMPT_FAILED,
+ T_ATTEMPT_SUCCEEDED,
+ T_ATTEMPT_KILLED
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
new file mode 100644
index 0000000..3d1c78d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.engine.MasterWorkerProtos.TaskFatalErrorReport;
+
+public class TaskFatalErrorEvent extends TaskAttemptEvent {
+ private TaskFatalErrorReport report;
+ public TaskFatalErrorEvent(TaskFatalErrorReport report) {
+ super(new QueryUnitAttemptId(report.getId()),
+ TaskAttemptEventType.TA_FATAL_ERROR);
+ this.report = report;
+ }
+
+ public String errorMessage() {
+ return report.getErrorMessage();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
new file mode 100644
index 0000000..25a8a14
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import com.google.protobuf.RpcCallback;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
+import org.apache.tajo.master.event.TaskRequestEvent.TaskRequestEventType;
+
+public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> {
+
+ public enum TaskRequestEventType {
+ TASK_REQ
+ }
+
+ private final ContainerId workerId;
+ private final RpcCallback<QueryUnitRequestProto> callback;
+
+ public TaskRequestEvent(ContainerId workerId,
+ RpcCallback<QueryUnitRequestProto> callback) {
+ super(TaskRequestEventType.TASK_REQ);
+ this.workerId = workerId;
+ this.callback = callback;
+ }
+
+ public ContainerId getContainerId() {
+ return this.workerId;
+ }
+
+ public RpcCallback<QueryUnitRequestProto> getCallback() {
+ return this.callback;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
new file mode 100644
index 0000000..03b2aba
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.QueryUnitAttemptId;
+
+import java.util.Arrays;
+
+public class TaskScheduleEvent extends TaskSchedulerEvent {
+ private final QueryUnitAttemptId attemptId;
+ private final boolean isLeafQuery;
+ private final String[] hosts;
+ private final String[] racks;
+
+ public TaskScheduleEvent(final QueryUnitAttemptId attemptId,
+ final EventType eventType, boolean isLeafQuery,
+ final String[] hosts,
+ final String[] racks) {
+ super(eventType, attemptId.getSubQueryId());
+ this.attemptId = attemptId;
+ this.isLeafQuery = isLeafQuery;
+ this.hosts = hosts;
+ this.racks = racks;
+ }
+
+ public QueryUnitAttemptId getAttemptId() {
+ return this.attemptId;
+ }
+
+ public boolean isLeafQuery() {
+ return this.isLeafQuery;
+ }
+
+ public String [] getHosts() {
+ return this.hosts;
+ }
+
+ public String [] getRacks() {
+ return this.racks;
+ }
+
+ @Override
+ public String toString() {
+ return "TaskScheduleEvent{" +
+ "attemptId=" + attemptId +
+ ", isLeafQuery=" + isLeafQuery +
+ ", hosts=" + (hosts == null ? null : Arrays.asList(hosts)) +
+ ", racks=" + (racks == null ? null : Arrays.asList(racks)) +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
new file mode 100644
index 0000000..d73bb87
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.SubQueryId;
+import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+
+public class TaskSchedulerEvent extends AbstractEvent<EventType> {
+ public enum EventType {
+ T_SCHEDULE,
+ T_SUBQUERY_COMPLETED
+ }
+
+ private final SubQueryId subQueryId;
+
+ public TaskSchedulerEvent(EventType eventType, SubQueryId subQueryId) {
+ super(eventType);
+ this.subQueryId = subQueryId;
+ }
+
+ public SubQueryId getSubQueryId() {
+ return this.subQueryId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java
new file mode 100644
index 0000000..28654f0
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.QueryUnitAttemptId;
+
+public class TaskTAttemptEvent extends TaskEvent {
+ private final QueryUnitAttemptId attemptId;
+ public TaskTAttemptEvent(QueryUnitAttemptId attemptId,
+ TaskEventType eventType) {
+ super(attemptId.getQueryUnitId(), eventType);
+ this.attemptId = attemptId;
+ }
+
+ public QueryUnitAttemptId getTaskAttemptId() {
+ return attemptId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java
new file mode 100644
index 0000000..c615532
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.AMRMClientImpl;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.tajo.SubQueryId;
+import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.master.QueryMaster.QueryContext;
+import org.apache.tajo.master.SubQueryState;
+import org.apache.tajo.master.event.ContainerAllocationEvent;
+import org.apache.tajo.master.event.ContainerAllocatorEventType;
+import org.apache.tajo.master.event.SubQueryContainerAllocationEvent;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class RMContainerAllocator extends AMRMClientImpl
+ implements EventHandler<ContainerAllocationEvent> {
+
+ /** Class Logger */
+ private static final Log LOG = LogFactory.getLog(RMContainerAllocator.
+ class.getName());
+
+ private QueryContext context;
+ private final EventHandler eventHandler;
+
+ public RMContainerAllocator(QueryContext context) {
+ super(context.getApplicationAttemptId());
+ this.context = context;
+ this.eventHandler = context.getDispatcher().getEventHandler();
+ }
+
+ public void init(Configuration conf) {
+ super.init(conf);
+ }
+
+ private static final int WAIT_INTERVAL_AVAILABLE_NODES = 500; // 0.5 second
+ public void start() {
+ super.start();
+
+ RegisterApplicationMasterResponse response;
+ try {
+ response = registerApplicationMaster("locahost", 10080, "http://localhost:1234");
+ context.setMaxContainerCapability(response.getMaximumResourceCapability().getMemory());
+ context.setMinContainerCapability(response.getMinimumResourceCapability().getMemory());
+
+ // If the number of cluster nodes is ZERO, it waits for available nodes.
+ AllocateResponse allocateResponse = allocate(0.0f);
+ while(allocateResponse.getNumClusterNodes() < 1) {
+ try {
+ Thread.sleep(WAIT_INTERVAL_AVAILABLE_NODES);
+ LOG.info("Waiting for Available Cluster Nodes");
+ allocateResponse = allocate(0);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
+ context.setNumClusterNodes(allocateResponse.getNumClusterNodes());
+ } catch (YarnRemoteException e) {
+ LOG.error(e);
+ }
+
+ startAllocatorThread();
+ }
+
+ protected Thread allocatorThread;
+ private final AtomicBoolean stopped = new AtomicBoolean(false);
+ private int rmPollInterval = 1000;//millis
+ protected void startAllocatorThread() {
+ allocatorThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+ try {
+ try {
+ heartbeat();
+ } catch (YarnException e) {
+ LOG.error("Error communicating with RM: " + e.getMessage() , e);
+ return;
+ } catch (Exception e) {
+ LOG.error("ERROR IN CONTACTING RM. ", e);
+ // TODO: for other exceptions
+ }
+ Thread.sleep(rmPollInterval);
+ } catch (InterruptedException e) {
+ if (!stopped.get()) {
+ LOG.warn("Allocated thread interrupted. Returning.");
+ }
+ return;
+ }
+ }
+ }
+ });
+ allocatorThread.setName("RMContainerAllocator");
+ allocatorThread.start();
+ }
+
+ public void stop() {
+ stopped.set(true);
+ super.stop();
+ FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
+ QueryState state = context.getQuery().getState();
+ if (state == QueryState.QUERY_SUCCEEDED) {
+ finishState = FinalApplicationStatus.SUCCEEDED;
+ } else if (state == QueryState.QUERY_KILLED
+ || (state == QueryState.QUERY_RUNNING)) {
+ finishState = FinalApplicationStatus.KILLED;
+ } else if (state == QueryState.QUERY_FAILED
+ || state == QueryState.QUERY_ERROR) {
+ finishState = FinalApplicationStatus.FAILED;
+ }
+
+ try {
+ unregisterApplicationMaster(finishState, "", "http://localhost:1234");
+ } catch (YarnRemoteException e) {
+ LOG.error(e);
+ }
+ }
+
+ private final Map<Priority, SubQueryId> subQueryMap =
+ new HashMap<Priority, SubQueryId>();
+
+ public void heartbeat() throws Exception {
+ AllocateResponse allocateResponse = allocate(context.getProgress());
+ AMResponse response = allocateResponse.getAMResponse();
+ List<Container> allocatedContainers = response.getAllocatedContainers();
+
+ LOG.info("Available Cluster Nodes: " + allocateResponse.getNumClusterNodes());
+ LOG.info("Available Resource: " + response.getAvailableResources());
+ LOG.info("Num of Allocated Containers: " + response.getAllocatedContainers().size());
+ if (response.getAllocatedContainers().size() > 0) {
+ LOG.info("================================================================");
+ for (Container container : response.getAllocatedContainers()) {
+ LOG.info("> Container Id: " + container.getId());
+ LOG.info("> Node Id: " + container.getNodeId());
+ LOG.info("> Resource (Mem): " + container.getResource().getMemory());
+ LOG.info("> State : " + container.getState());
+ LOG.info("> Priority: " + container.getPriority());
+ }
+ LOG.info("================================================================");
+ }
+
+ Map<SubQueryId, List<Container>> allocated = new HashMap<SubQueryId, List<Container>>();
+ if (allocatedContainers.size() > 0) {
+ for (Container container : allocatedContainers) {
+ SubQueryId subQueryId = subQueryMap.get(container.getPriority());
+ SubQueryState state = context.getSubQuery(subQueryId).getState();
+ if (!(isRunningState(state) && subQueryMap.containsKey(container.getPriority()))) {
+ releaseAssignedContainer(container.getId());
+ synchronized (subQueryMap) {
+ subQueryMap.remove(container.getPriority());
+ }
+ } else {
+ if (allocated.containsKey(subQueryId)) {
+ allocated.get(subQueryId).add(container);
+ } else {
+ allocated.put(subQueryId, Lists.newArrayList(container));
+ }
+ }
+ }
+
+ for (Entry<SubQueryId, List<Container>> entry : allocated.entrySet()) {
+ eventHandler.handle(new SubQueryContainerAllocationEvent(entry.getKey(), entry.getValue()));
+ }
+ }
+ }
+
+ private static boolean isRunningState(SubQueryState state) {
+ return state == SubQueryState.INIT || state == SubQueryState.NEW ||
+ state == SubQueryState.CONTAINER_ALLOCATED || state == SubQueryState.RUNNING;
+ }
+
+ @Override
+ public void handle(ContainerAllocationEvent event) {
+
+ if (event.getType() == ContainerAllocatorEventType.CONTAINER_REQ) {
+ LOG.info(event);
+ subQueryMap.put(event.getPriority(), event.getSubQueryId());
+ addContainerRequest(new ContainerRequest(event.getCapability(), null, null,
+ event.getPriority(), event.getRequiredNum()));
+
+ } else if (event.getType() == ContainerAllocatorEventType.CONTAINER_DEALLOCATE) {
+ LOG.info(event);
+ } else {
+ LOG.info(event);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/net/CachedDNSResolver.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/net/CachedDNSResolver.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/net/CachedDNSResolver.java
new file mode 100644
index 0000000..2a53c47
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/net/CachedDNSResolver.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.net;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CachedDNSResolver {
+ private static Map<String, String> hostNameToIPAddrMap
+ = new ConcurrentHashMap<String, String>();
+
+ private static CachedDNSResolver instance;
+
+ static {
+ instance = new CachedDNSResolver();
+ }
+
+ public static String resolve(String hostName) {
+
+ if (hostNameToIPAddrMap.containsKey(hostName)) {
+ return hostNameToIPAddrMap.get(hostName);
+ }
+
+ String ipAddress = null;
+ try {
+ ipAddress = InetAddress.getByName(hostName).getHostAddress();
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+ hostNameToIPAddrMap.put(hostName, ipAddress);
+
+ return ipAddress;
+ }
+
+ public static String [] resolve(String [] hostNames) {
+ if (hostNames == null) {
+ return null;
+ }
+
+ String [] resolved = new String[hostNames.length];
+ for (int i = 0; i < hostNames.length; i++) {
+ resolved[i] = resolve(hostNames[i]);
+ }
+ return resolved;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/ScheduleTaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/ScheduleTaskEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/ScheduleTaskEvent.java
new file mode 100644
index 0000000..e1e074d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/ScheduleTaskEvent.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.scheduler.event;
+
+import org.apache.tajo.QueryUnitAttemptId;
+
+public class ScheduleTaskEvent extends SchedulerEvent {
+ private final QueryUnitAttemptId attemptId;
+
+ public ScheduleTaskEvent(QueryUnitAttemptId id) {
+ super(SchedulerEventType.SCHEDULE);
+ this.attemptId = id;
+ }
+
+ public QueryUnitAttemptId getTaskAttemptId() {
+ return attemptId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEvent.java
new file mode 100644
index 0000000..664a2d6
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEvent.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.scheduler.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class SchedulerEvent extends AbstractEvent<SchedulerEventType> {
+
+ public SchedulerEvent(SchedulerEventType schedulerEventType) {
+ super(schedulerEventType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEventType.java
new file mode 100644
index 0000000..2e49c47
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEventType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.scheduler.event;
+
+/**
+ * Event Type for Scheduler
+ */
+public enum SchedulerEventType {
+ SCHEDULE,
+ RESCHEDULE
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/GeoUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/GeoUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/GeoUtil.java
new file mode 100644
index 0000000..6028725
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/GeoUtil.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import com.maxmind.geoip.LookupService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+
+import java.io.IOException;
+
+public class GeoUtil {
+ private static final Log LOG = LogFactory.getLog(GeoUtil.class);
+ private static LookupService lookup;
+
+ static {
+ try {
+ TajoConf conf = new TajoConf();
+ lookup = new LookupService(conf.getVar(ConfVars.GEOIP_DATA),
+ LookupService.GEOIP_MEMORY_CACHE);
+ } catch (IOException e) {
+ LOG.error("Cannot open the geoip data", e);
+ }
+ }
+
+ public static String getCountryCode(String host) {
+ return lookup.getCountry(host).getCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
new file mode 100644
index 0000000..883725c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import com.google.gson.Gson;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.eval.ConstEval;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalNode.Type;
+import org.apache.tajo.engine.eval.EvalNodeVisitor;
+import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.json.GsonCreator;
+import org.apache.tajo.engine.planner.logical.IndexScanNode;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.storage.Fragment;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+
+public class IndexUtil {
+ public static String getIndexNameOfFrag(Fragment fragment, SortSpec[] keys) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(fragment.getPath().getName() + "_");
+ builder.append(fragment.getStartOffset() + "_" + fragment.getLength() + "_");
+ for(int i = 0 ; i < keys.length ; i ++) {
+ builder.append(keys[i].getSortKey().getColumnName()+"_");
+ }
+ builder.append("_index");
+ return builder.toString();
+
+ }
+
+ public static String getIndexName (String indexName , SortSpec[] keys) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(indexName + "_");
+ for(int i = 0 ; i < keys.length ; i ++) {
+ builder.append(keys[i].getSortKey().getColumnName() + "_");
+ }
+ return builder.toString();
+ }
+
+ public static IndexScanNode indexEval( ScanNode scanNode,
+ Iterator<Entry<String, String>> iter ) {
+
+ EvalNode qual = scanNode.getQual();
+ Gson gson = GsonCreator.getInstance();
+
+ FieldAndValueFinder nodeFinder = new FieldAndValueFinder();
+ qual.preOrder(nodeFinder);
+ LinkedList<EvalNode> nodeList = nodeFinder.getNodeList();
+
+ int maxSize = Integer.MIN_VALUE;
+ SortSpec[] maxIndex = null;
+
+ String json;
+ while(iter.hasNext()) {
+ Entry<String , String> entry = iter.next();
+ json = entry.getValue();
+ SortSpec[] sortKey = gson.fromJson(json, SortSpec[].class);
+ if(sortKey.length > nodeList.size()) {
+ /* If the number of the sort key is greater than where condition,
+ * this index cannot be used
+ * */
+ continue;
+ } else {
+ boolean[] equal = new boolean[sortKey.length];
+ for(int i = 0 ; i < sortKey.length ; i ++) {
+ for(int j = 0 ; j < nodeList.size() ; j ++) {
+ Column col = ((FieldEval)(nodeList.get(j).getLeftExpr())).getColumnRef();
+ if(col.equals(sortKey[i].getSortKey())) {
+ equal[i] = true;
+ }
+ }
+ }
+ boolean chk = true;
+ for(int i = 0 ; i < equal.length ; i ++) {
+ chk = chk && equal[i];
+ }
+ if(chk) {
+ if(maxSize < sortKey.length) {
+ maxSize = sortKey.length;
+ maxIndex = sortKey;
+ }
+ }
+ }
+ }
+ if(maxIndex == null) {
+ return null;
+ } else {
+ Schema keySchema = new Schema();
+ for(int i = 0 ; i < maxIndex.length ; i ++ ) {
+ keySchema.addColumn(maxIndex[i].getSortKey());
+ }
+ Datum[] datum = new Datum[nodeList.size()];
+ for(int i = 0 ; i < nodeList.size() ; i ++ ) {
+ datum[i] = ((ConstEval)(nodeList.get(i).getRightExpr())).getValue();
+ }
+
+ return new IndexScanNode(scanNode, keySchema , datum , maxIndex);
+ }
+
+ }
+
+
+ private static class FieldAndValueFinder implements EvalNodeVisitor {
+ private LinkedList<EvalNode> nodeList = new LinkedList<EvalNode>();
+
+ public LinkedList<EvalNode> getNodeList () {
+ return this.nodeList;
+ }
+
+ @Override
+ public void visit(EvalNode node) {
+ switch(node.getType()) {
+ case AND:
+ break;
+ case EQUAL:
+ if( node.getLeftExpr().getType() == Type.FIELD
+ && node.getRightExpr().getType() == Type.CONST ) {
+ nodeList.add(node);
+ }
+ break;
+ case IS:
+ if( node.getLeftExpr().getType() == Type.FIELD
+ && node.getRightExpr().getType() == Type.CONST) {
+ nodeList.add(node);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/HttpServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/HttpServer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/HttpServer.java
new file mode 100644
index 0000000..91fdb29
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/HttpServer.java
@@ -0,0 +1,445 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.webapp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.mortbay.jetty.Connector;
+import org.mortbay.jetty.Handler;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.handler.ContextHandlerCollection;
+import org.mortbay.jetty.nio.SelectChannelConnector;
+import org.mortbay.jetty.servlet.*;
+import org.mortbay.jetty.webapp.WebAppContext;
+import org.mortbay.thread.QueuedThreadPool;
+import org.mortbay.util.MultiException;
+
+import javax.servlet.http.HttpServlet;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.BindException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is borrowed from Hadoop and is simplified to our objective.
+ */
+public class HttpServer {
+ private static final Log LOG = LogFactory.getLog(HttpServer.class);
+
+ protected final Server webServer;
+ protected final Connector listener;
+ protected final WebAppContext webAppContext;
+ protected final boolean findPort;
+ protected final Map<Context, Boolean> defaultContexts =
+ new HashMap<Context, Boolean>();
+ protected final List<String> filterNames = new ArrayList<String>();
+ private static final int MAX_RETRIES = 10;
+ private final boolean listenerStartedExternally;
+ static final String STATE_DESCRIPTION_ALIVE = " - alive";
+ static final String STATE_DESCRIPTION_NOT_LIVE = " - not live";
+
+ public HttpServer(String name, String bindAddress, int port,
+ boolean findPort, Connector connector, Configuration conf,
+ String[] pathSpecs) throws IOException {
+ this.webServer = new Server();
+ this.findPort = findPort;
+
+ if (connector == null) {
+ listenerStartedExternally = false;
+ listener = createBaseListener(conf);
+ listener.setHost(bindAddress);
+ listener.setPort(port);
+ } else {
+ listenerStartedExternally = true;
+ listener = connector;
+ }
+
+ webServer.addConnector(listener);
+
+ int maxThreads = conf.getInt("tajo.http.maxthreads", -1);
+ // If HTTP_MAX_THREADS is not configured, QueueThreadPool() will use the
+ // default value (currently 250).
+ QueuedThreadPool threadPool = maxThreads == -1 ? new QueuedThreadPool()
+ : new QueuedThreadPool(maxThreads);
+ webServer.setThreadPool(threadPool);
+
+ final String appDir = getWebAppsPath(name);
+ ContextHandlerCollection contexts = new ContextHandlerCollection();
+ webServer.setHandler(contexts);
+
+ webAppContext = new WebAppContext();
+ webAppContext.setDisplayName(name);
+ webAppContext.setContextPath("/");
+ webAppContext.setWar(appDir + "/" + name);
+ webServer.addHandler(webAppContext);
+
+ addDefaultApps(contexts, appDir, conf);
+ }
+
+ /**
+ * Create a required listener for the Jetty instance listening on the port
+ * provided. This wrapper and all subclasses must create at least one
+ * listener.
+ */
+ public Connector createBaseListener(Configuration conf) throws IOException {
+ return HttpServer.createDefaultChannelConnector();
+ }
+
+ static Connector createDefaultChannelConnector() {
+ SelectChannelConnector ret = new SelectChannelConnector();
+ ret.setLowResourceMaxIdleTime(10000);
+ ret.setAcceptQueueSize(128);
+ ret.setResolveNames(false);
+ ret.setUseDirectBuffers(false);
+ return ret;
+ }
+
+ /**
+ * Add default apps.
+ *
+ * @param appDir
+ * The application directory
+ * @throws IOException
+ */
+ protected void addDefaultApps(ContextHandlerCollection parent,
+ final String appDir, Configuration conf) throws IOException {
+ // set up the context for "/logs/" if "hadoop.log.dir" property is defined.
+ String logDir = System.getProperty("tajo.log.dir");
+ if (logDir != null) {
+ Context logContext = new Context(parent, "/logs");
+ logContext.setResourceBase(logDir);
+ //logContext.addServlet(AdminAuthorizedServlet.class, "/*");
+ logContext.setDisplayName("logs");
+ defaultContexts.put(logContext, true);
+ }
+ // set up the context for "/static/*"
+ Context staticContext = new Context(parent, "/static");
+ staticContext.setResourceBase(appDir + "/static");
+ staticContext.addServlet(DefaultServlet.class, "/*");
+ staticContext.setDisplayName("static");
+ defaultContexts.put(staticContext, true);
+ }
+
+ public void addContext(Context ctxt, boolean isFiltered)
+ throws IOException {
+ webServer.addHandler(ctxt);
+ defaultContexts.put(ctxt, isFiltered);
+ }
+
+ /**
+ * Add a context
+ * @param pathSpec The path spec for the context
+ * @param dir The directory containing the context
+ * @param isFiltered if true, the servlet is added to the filter path mapping
+ * @throws IOException
+ */
+ protected void addContext(String pathSpec, String dir, boolean isFiltered) throws IOException {
+ if (0 == webServer.getHandlers().length) {
+ throw new RuntimeException("Couldn't find handler");
+ }
+ WebAppContext webAppCtx = new WebAppContext();
+ webAppCtx.setContextPath(pathSpec);
+ webAppCtx.setWar(dir);
+ addContext(webAppCtx, true);
+ }
+
+ /**
+ * Set a value in the webapp context. These values are available to the jsp
+ * pages as "application.getAttribute(name)".
+ * @param name The name of the attribute
+ * @param value The value of the attribute
+ */
+ public void setAttribute(String name, Object value) {
+ webAppContext.setAttribute(name, value);
+ }
+
+ /**
+ * Add a servlet in the server.
+ * @param name The name of the servlet (can be passed as null)
+ * @param pathSpec The path spec for the servlet
+ * @param clazz The servlet class
+ */
+ public void addServlet(String name, String pathSpec,
+ Class<? extends HttpServlet> clazz) {
+ addInternalServlet(name, pathSpec, clazz, false);
+ addFilterPathMapping(pathSpec, webAppContext);
+ }
+
+ /**
+ * Add an internal servlet in the server, specifying whether or not to
+ * protect with Kerberos authentication.
+ * Note: This method is to be used for adding servlets that facilitate
+ * internal communication and not for user facing functionality. For
+ * servlets added using this method, filters (except internal Kerberized
+ * filters) are not enabled.
+ *
+ * @param name The name of the servlet (can be passed as null)
+ * @param pathSpec The path spec for the servlet
+ * @param clazz The servlet class
+ */
+ public void addInternalServlet(String name, String pathSpec,
+ Class<? extends HttpServlet> clazz, boolean requireAuth) {
+ ServletHolder holder = new ServletHolder(clazz);
+ if (name != null) {
+ holder.setName(name);
+ }
+ webAppContext.addServlet(holder, pathSpec);
+
+ if(requireAuth && UserGroupInformation.isSecurityEnabled()) {
+ LOG.info("Adding Kerberos filter to " + name);
+ ServletHandler handler = webAppContext.getServletHandler();
+ FilterMapping fmap = new FilterMapping();
+ fmap.setPathSpec(pathSpec);
+ fmap.setFilterName("krb5Filter");
+ fmap.setDispatches(Handler.ALL);
+ handler.addFilterMapping(fmap);
+ }
+ }
+
+ /**
+ * Add the path spec to the filter path mapping.
+ * @param pathSpec The path spec
+ * @param webAppCtx The WebApplicationContext to add to
+ */
+ protected void addFilterPathMapping(String pathSpec,
+ Context webAppCtx) {
+ ServletHandler handler = webAppCtx.getServletHandler();
+ for(String name : filterNames) {
+ FilterMapping fmap = new FilterMapping();
+ fmap.setPathSpec(pathSpec);
+ fmap.setFilterName(name);
+ fmap.setDispatches(Handler.ALL);
+ handler.addFilterMapping(fmap);
+ }
+ }
+
+ protected String getWebAppsPath(String appName) throws FileNotFoundException {
+ URL url = getClass().getClassLoader().getResource("webapps/" + appName);
+ if (url == null) {
+ throw new FileNotFoundException("webapps/" + appName
+ + " not found in CLASSPATH");
+ }
+ String urlString = url.toString();
+ return urlString.substring(0, urlString.lastIndexOf('/'));
+ }
+
+ /**
+ * Get the value in the webapp context.
+ * @param name The name of the attribute
+ * @return The value of the attribute
+ */
+ public Object getAttribute(String name) {
+ return webAppContext.getAttribute(name);
+ }
+
+ /**
+ * Get the port that the server is on
+ * @return the port
+ */
+ public int getPort() {
+ return webServer.getConnectors()[0].getLocalPort();
+ }
+
+ /**
+ * Set the min, max number of worker threads (simultaneous connections).
+ */
+ public void setThreads(int min, int max) {
+ QueuedThreadPool pool = (QueuedThreadPool) webServer.getThreadPool() ;
+ pool.setMinThreads(min);
+ pool.setMaxThreads(max);
+ }
+
+ /**
+ * Start the server. Does not wait for the server to start.
+ */
+ public void start() throws IOException {
+ try {
+ if (listenerStartedExternally) { // Expect that listener was started
+ // securely
+ if (listener.getLocalPort() == -1) // ... and verify
+ throw new Exception("Exepected webserver's listener to be started "
+ + "previously but wasn't");
+ // And skip all the port rolling issues.
+ webServer.start();
+ } else {
+ int port;
+ int oriPort = listener.getPort(); // The original requested port
+ while (true) {
+ try {
+ port = webServer.getConnectors()[0].getLocalPort();
+ LOG.debug("Port returned by webServer.getConnectors()[0]."
+ + "getLocalPort() before open() is " + port
+ + ". Opening the listener on " + oriPort);
+ listener.open();
+ port = listener.getLocalPort();
+ LOG.debug("listener.getLocalPort() returned "
+ + listener.getLocalPort()
+ + " webServer.getConnectors()[0].getLocalPort() returned "
+ + webServer.getConnectors()[0].getLocalPort());
+ // Workaround to handle the problem reported in HADOOP-4744
+ if (port < 0) {
+ Thread.sleep(100);
+ int numRetries = 1;
+ while (port < 0) {
+ LOG.warn("listener.getLocalPort returned " + port);
+ if (numRetries++ > MAX_RETRIES) {
+ throw new Exception(" listener.getLocalPort is returning "
+ + "less than 0 even after " + numRetries + " resets");
+ }
+ for (int i = 0; i < 2; i++) {
+ LOG.info("Retrying listener.getLocalPort()");
+ port = listener.getLocalPort();
+ if (port > 0) {
+ break;
+ }
+ Thread.sleep(200);
+ }
+ if (port > 0) {
+ break;
+ }
+ LOG.info("Bouncing the listener");
+ listener.close();
+ Thread.sleep(1000);
+ listener.setPort(oriPort == 0 ? 0 : (oriPort += 1));
+ listener.open();
+ Thread.sleep(100);
+ port = listener.getLocalPort();
+ }
+ } // Workaround end
+ LOG.info("Jetty bound to port " + port);
+ webServer.start();
+ break;
+ } catch (IOException ex) {
+ // if this is a bind exception,
+ // then try the next port number.
+ if (ex instanceof BindException) {
+ if (!findPort) {
+ BindException be = new BindException("Port in use: "
+ + listener.getHost() + ":" + listener.getPort());
+ be.initCause(ex);
+ throw be;
+ }
+ } else {
+ LOG.info("HttpServer.start() threw a non Bind IOException");
+ throw ex;
+ }
+ } catch (MultiException ex) {
+ LOG.info("HttpServer.start() threw a MultiException");
+ throw ex;
+ }
+ listener.setPort((oriPort += 1));
+ }
+ }
+ // Make sure there is no handler failures.
+ Handler[] handlers = webServer.getHandlers();
+ for (int i = 0; i < handlers.length; i++) {
+ if (handlers[i].isFailed()) {
+ throw new IOException(
+ "Problem in starting http server. Server handlers failed");
+ }
+ }
+ } catch (IOException e) {
+ throw e;
+ } catch (InterruptedException e) {
+ throw (IOException) new InterruptedIOException(
+ "Interrupted while starting HTTP server").initCause(e);
+ } catch (Exception e) {
+ throw new IOException("Problem starting http server", e);
+ }
+ }
+
+ /**
+ * stop the server
+ */
+ public void stop() throws Exception {
+ MultiException exception = null;
+ try {
+ listener.close();
+ } catch (Exception e) {
+ LOG.error(
+ "Error while stopping listener for webapp"
+ + webAppContext.getDisplayName(), e);
+ exception = addMultiException(exception, e);
+ }
+
+ try {
+ // clear & stop webAppContext attributes to avoid memory leaks.
+ webAppContext.clearAttributes();
+ webAppContext.stop();
+ } catch (Exception e) {
+ LOG.error("Error while stopping web app context for webapp "
+ + webAppContext.getDisplayName(), e);
+ exception = addMultiException(exception, e);
+ }
+ try {
+ webServer.stop();
+ } catch (Exception e) {
+ LOG.error(
+ "Error while stopping web server for webapp "
+ + webAppContext.getDisplayName(), e);
+ exception = addMultiException(exception, e);
+ }
+
+ if (exception != null) {
+ exception.ifExceptionThrow();
+ }
+
+ }
+
+ private MultiException addMultiException(MultiException exception, Exception e) {
+ if (exception == null) {
+ exception = new MultiException();
+ }
+ exception.add(e);
+ return exception;
+ }
+
+ public void join() throws InterruptedException {
+ webServer.join();
+ }
+
+ /**
+ * Test for the availability of the web server
+ *
+ * @return true if the web server is started, false otherwise
+ */
+ public boolean isAlive() {
+ return webServer != null && webServer.isStarted();
+ }
+
+ /**
+ * Return the host and port of the HttpServer, if live
+ *
+ * @return the classname and any HTTP URL
+ */
+ @Override
+ public String toString() {
+ return listener != null ? ("HttpServer at http://" + listener.getHost()
+ + ":" + listener.getLocalPort() + "/" + (isAlive() ? STATE_DESCRIPTION_ALIVE
+ : STATE_DESCRIPTION_NOT_LIVE))
+ : "Inactive HttpServer";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
new file mode 100644
index 0000000..f185cc6
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.webapp;
+
+import org.apache.hadoop.conf.Configuration;
+import org.mortbay.jetty.Connector;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.master.TajoMaster;
+
+import java.io.IOException;
+
+public class StaticHttpServer extends HttpServer {
+ private static StaticHttpServer instance = null;
+ private TajoMaster master = null;
+
+ private StaticHttpServer(TajoMaster master , String name, String bindAddress, int port,
+ boolean findPort, Connector connector, Configuration conf,
+ String[] pathSpecs) throws IOException {
+ super( name, bindAddress, port, findPort, connector, conf, pathSpecs);
+ this.master = master;
+ }
+ public static StaticHttpServer getInstance() {
+ return instance;
+ }
+ public static StaticHttpServer getInstance( TajoMaster master, String name,
+ String bindAddress, int port, boolean findPort, Connector connector,
+ TajoConf conf,
+ String[] pathSpecs) throws IOException {
+ String addr = bindAddress;
+ if(instance == null) {
+ if(bindAddress == null || bindAddress.compareTo("") == 0) {
+ addr = conf.getVar(ConfVars.TASKRUNNER_LISTENER_ADDRESS).split(":")[0];
+ }
+
+ instance = new StaticHttpServer(master, name, addr, port,
+ findPort, connector, conf, pathSpecs);
+ instance.setAttribute("tajo.master", master);
+ instance.setAttribute("tajo.master.addr", addr);
+ instance.setAttribute("tajo.master.conf", conf);
+ instance.setAttribute("tajo.master.starttime", System.currentTimeMillis());
+ }
+ return instance;
+ }
+ public TajoMaster getMaster() {
+
+ return this.master;
+ }
+ public void set(String name, Object object) {
+ instance.setAttribute(name, object);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
new file mode 100644
index 0000000..4619fe4
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.*;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.Executors;
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+/**
+ * Fetcher fetches data from a given uri via HTTP protocol and stores them into
+ * a specific file. It aims at asynchronous and efficient data transmit.
+ */
+public class Fetcher {
+ private final static Log LOG = LogFactory.getLog(Fetcher.class);
+
+ private final URI uri;
+ private final File file;
+
+ private final String host;
+ private int port;
+
+ public Fetcher(URI uri, File file) {
+ this.uri = uri;
+ this.file = file;
+
+ String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
+ this.host = uri.getHost() == null ? "localhost" : uri.getHost();
+ this.port = uri.getPort();
+ if (port == -1) {
+ if (scheme.equalsIgnoreCase("http")) {
+ this.port = 80;
+ } else if (scheme.equalsIgnoreCase("https")) {
+ this.port = 443;
+ }
+ }
+ }
+
+ public File get() throws IOException {
+ ClientBootstrap bootstrap = new ClientBootstrap(
+ new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool()));
+ bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec
+ bootstrap.setOption("receiveBufferSize", 1048576); // set 1M
+ ChannelPipelineFactory factory = new HttpClientPipelineFactory(file);
+ bootstrap.setPipelineFactory(factory);
+
+ ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
+
+ // Wait until the connection attempt succeeds or fails.
+ Channel channel = future.awaitUninterruptibly().getChannel();
+ if (!future.isSuccess()) {
+ bootstrap.releaseExternalResources();
+ throw new IOException(future.getCause());
+ }
+
+ String query = uri.getPath()
+ + (uri.getRawQuery() != null ? "?" + uri.getRawQuery() : "");
+ // Prepare the HTTP request.
+ HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, query);
+ request.setHeader(HttpHeaders.Names.HOST, host);
+ LOG.info("Fetch: " + uri);
+ request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+ request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
+
+ // Send the HTTP request.
+ channel.write(request);
+
+ // Wait for the server to close the connection.
+ channel.getCloseFuture().awaitUninterruptibly();
+
+ // Shut down executor threads to exit.
+ bootstrap.releaseExternalResources();
+
+ return file;
+ }
+
+ public URI getURI() {
+ return this.uri;
+ }
+
+ public static class HttpClientHandler extends SimpleChannelUpstreamHandler {
+ private volatile boolean readingChunks;
+ private final File file;
+ private RandomAccessFile raf;
+ private FileChannel fc;
+ private long length = -1;
+
+ public HttpClientHandler(File file) throws FileNotFoundException {
+ this.file = file;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ if (!readingChunks) {
+ HttpResponse response = (HttpResponse) e.getMessage();
+
+ StringBuilder sb = new StringBuilder();
+ if (LOG.isDebugEnabled()) {
+ sb.append("STATUS: ").append(response.getStatus())
+ .append(", VERSION: ").append(response.getProtocolVersion())
+ .append(", HEADER: ");
+ }
+ if (!response.getHeaderNames().isEmpty()) {
+ for (String name : response.getHeaderNames()) {
+ for (String value : response.getHeaders(name)) {
+ if (LOG.isDebugEnabled()) {
+ sb.append(name).append(" = ").append(value);
+ }
+ if (this.length == -1 && name.equals("Content-Length")) {
+ this.length = Long.valueOf(value);
+ }
+ }
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sb.toString());
+ }
+
+ if (response.getStatus() == HttpResponseStatus.NO_CONTENT) {
+ LOG.info("There are no data corresponding to the request");
+ return;
+ }
+
+ this.raf = new RandomAccessFile(file, "rw");
+ this.fc = raf.getChannel();
+
+ if (response.isChunked()) {
+ readingChunks = true;
+ } else {
+ ChannelBuffer content = response.getContent();
+ if (content.readable()) {
+ fc.write(content.toByteBuffer());
+ }
+ }
+ } else {
+ HttpChunk chunk = (HttpChunk) e.getMessage();
+ if (chunk.isLast()) {
+ readingChunks = false;
+ long fileLength = fc.position();
+ fc.close();
+ raf.close();
+ if (fileLength == length) {
+ LOG.info("Data fetch is done (total received bytes: " + fileLength
+ + ")");
+ } else {
+ LOG.info("Data fetch is done, but cannot get all data "
+ + "(received/total: " + fileLength + "/" + length + ")");
+ }
+ } else {
+ fc.write(chunk.getContent().toByteBuffer());
+ }
+ }
+ }
+ }
+
+ public static class HttpClientPipelineFactory implements
+ ChannelPipelineFactory {
+ private final File file;
+
+ public HttpClientPipelineFactory(File file) {
+ this.file = file;
+ }
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = pipeline();
+
+ pipeline.addLast("codec", new HttpClientCodec());
+ pipeline.addLast("inflater", new HttpContentDecompressor());
+ pipeline.addLast("handler", new HttpClientHandler(file));
+ return pipeline;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/InterDataRetriever.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
new file mode 100644
index 0000000..42ad875
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.worker.dataserver.FileAccessForbiddenException;
+import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
+import org.apache.tajo.worker.dataserver.retriever.FileChunk;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+@Deprecated
+public class InterDataRetriever implements DataRetriever {
+ private final Log LOG = LogFactory.getLog(InterDataRetriever.class);
+ private final Set<QueryUnitId> registered = Sets.newHashSet();
+ private final Map<String, String> map = Maps.newConcurrentMap();
+
+ public InterDataRetriever() {
+ }
+
+ public void register(QueryUnitId id, String baseURI) {
+ synchronized (registered) {
+ if (!registered.contains(id)) {
+ map.put(id.toString(), baseURI);
+ registered.add(id);
+ }
+ }
+ }
+
+ public void unregister(QueryUnitId id) {
+ synchronized (registered) {
+ if (registered.contains(id)) {
+ map.remove(id.toString());
+ registered.remove(id);
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tajo.worker.dataserver.retriever.DataRetriever#handle(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.handler.codec.http.HttpRequest)
+ */
+ @Override
+ public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
+ throws IOException {
+
+ int start = request.getUri().indexOf('?');
+ if (start < 0) {
+ throw new IllegalArgumentException("Wrong request: " + request.getUri());
+ }
+
+ String queryStr = request.getUri().substring(start + 1);
+ LOG.info("QUERY: " + queryStr);
+ String [] queries = queryStr.split("&");
+
+ String qid = null;
+ String fn = null;
+ String [] kv;
+ for (String query : queries) {
+ kv = query.split("=");
+ if (kv[0].equals("qid")) {
+ qid = kv[1];
+ } else if (kv[0].equals("fn")) {
+ fn = kv[1];
+ }
+ }
+
+ String baseDir = map.get(qid);
+ if (baseDir == null) {
+ throw new FileNotFoundException("No such qid: " + qid);
+ }
+
+ File file = new File(baseDir + "/" + fn);
+ if (file.isHidden() || !file.exists()) {
+ throw new FileNotFoundException("No such file: " + baseDir + "/"
+ + file.getName());
+ }
+ if (!file.isFile()) {
+ throw new FileAccessForbiddenException("No such file: "
+ + baseDir + "/" + file.getName());
+ }
+
+ return new FileChunk[] {new FileChunk(file, 0, file.length())};
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java
new file mode 100644
index 0000000..36e7353
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.worker.dataserver.retriever.FileChunk;
+import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class PartitionRetrieverHandler implements RetrieverHandler {
+ private final String baseDir;
+
+ public PartitionRetrieverHandler(String baseDir) {
+ this.baseDir = baseDir;
+ }
+
+ @Override
+ public FileChunk get(Map<String, List<String>> kvs) throws IOException {
+ // nothing to verify the file because AdvancedDataRetriever checks
+ // its validity of the file.
+ File file = new File(baseDir + "/" + kvs.get("fn").get(0));
+
+ return new FileChunk(file, 0, file.length());
+ }
+}