You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 13:44:22 UTC

[19/57] [abbrv] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java
new file mode 100644
index 0000000..a32a913
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.*;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+
+public abstract class TajoMetricsReporter {
+  public abstract void report(SortedMap<String, Gauge> gauges,
+                              SortedMap<String, Counter> counters,
+                              SortedMap<String, Histogram> histograms,
+                              SortedMap<String, Meter> meters,
+                              SortedMap<String, Timer> timers);
+
+  public <T> Map<String, Map<String, T>> findMetricsItemGroup(SortedMap<String, T> metricsMap) {
+    Map<String, Map<String, T>> metricsGroup = new HashMap<String, Map<String, T>>();
+
+    String previousGroup = null;
+    Map<String, T> groupItems = new HashMap<String, T>();
+
+    for (Map.Entry<String, T> entry : metricsMap.entrySet()) {
+      String key = entry.getKey();
+      String[] keyTokens = key.split("\\.");
+
+      String groupName = null;
+      String itemName = null;
+
+      if (keyTokens.length > 2) {
+        groupName = keyTokens[0] + "." + keyTokens[1];
+        itemName = "";
+        String prefix = "";
+        for (int i = 2; i < keyTokens.length; i++) {
+          itemName += prefix + keyTokens[i];
+          prefix = ".";
+        }
+      } else {
+        groupName = "";
+        itemName = key;
+        if(!metricsGroup.containsKey(groupName)) {
+          metricsGroup.put(groupName, new HashMap<String, T>());
+        }
+        metricsGroup.get(groupName).put(itemName, entry.getValue());
+        continue;
+      }
+
+      if (previousGroup != null && !previousGroup.equals(groupName)) {
+        metricsGroup.put(previousGroup, groupItems);
+        groupItems = new HashMap<String, T>();
+      }
+      groupItems.put(itemName, entry.getValue());
+      previousGroup = groupName;
+    }
+
+    if(groupItems != null && !groupItems.isEmpty()) {
+      metricsGroup.put(previousGroup, groupItems);
+    }
+
+    return metricsGroup;
+  }
+
+  protected String meterGroupToString(String dateTime, String hostAndPort, double rateFactor,
+                                      String groupName, Map<String, Meter> meters) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(dateTime).append(" ");
+    if(hostAndPort != null && !hostAndPort.isEmpty()) {
+      sb.append(hostAndPort).append(" ");
+    }
+    sb.append("meter").append(" ");
+
+    if(!groupName.isEmpty()) {
+      sb.append(groupName).append(" ");
+    }
+    String prefix = "";
+    for(Map.Entry<String, Meter> eachMeter: meters.entrySet()) {
+      String key = eachMeter.getKey();
+      Meter meter = eachMeter.getValue();
+      sb.append(prefix);
+      sb.append(key).append(".count=").append(meter.getCount()).append("|");
+      sb.append(key).append(".mean=").append(String.format("%2.2f",
+          convertRate(meter.getMeanRate(), rateFactor))).append("|");
+      sb.append(key).append(".1minute=").append(String.format("%2.2f",
+          convertRate(meter.getOneMinuteRate(), rateFactor))).append("|");
+      sb.append(key).append(".5minute=").append(String.format("%2.2f",
+          convertRate(meter.getFiveMinuteRate(), rateFactor))).append("|");
+      sb.append(key).append(".15minute=").append(String.format("%2.2f",
+          convertRate(meter.getFifteenMinuteRate(), rateFactor)));
+      prefix = ",";
+    }
+
+    return sb.toString();
+  }
+
+  protected String counterGroupToString(String dateTime, String hostAndPort, double rateFactor,
+                                        String groupName, Map<String, Counter> counters) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(dateTime).append(" ");
+    if(hostAndPort != null && !hostAndPort.isEmpty()) {
+      sb.append(hostAndPort).append(" ");
+    }
+    sb.append("counter").append(" ");
+
+    if(!groupName.isEmpty()) {
+      sb.append(groupName).append(" ");
+    }
+    String prefix = "";
+    for(Map.Entry<String, Counter> eachCounter: counters.entrySet()) {
+      sb.append(prefix);
+      sb.append(eachCounter.getKey()).append("=").append(eachCounter.getValue().getCount());
+      prefix = ",";
+
+    }
+    return sb.toString();
+  }
+
+  protected String gaugeGroupToString(String dateTime, String hostAndPort, double rateFactor,
+                                      String groupName, Map<String, Gauge> gauges) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(dateTime).append(" ");
+    if(hostAndPort != null && !hostAndPort.isEmpty()) {
+      sb.append(hostAndPort).append(" ");
+    }
+    sb.append("guage").append(" ");
+
+    if(!groupName.isEmpty()) {
+      sb.append(groupName).append(" ");
+    }
+    String prefix = "";
+    for(Map.Entry<String, Gauge> eachGauge: gauges.entrySet()) {
+      sb.append(prefix).append(eachGauge.getKey()).append("=").append(eachGauge.getValue().getValue());
+      prefix = ",";
+    }
+    return sb.toString();
+  }
+
+  protected String histogramGroupToString(String dateTime, String hostAndPort, double rateFactor,
+                                          String groupName, Map<String, Histogram> histograms) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(dateTime).append(" ");
+    if(hostAndPort != null && !hostAndPort.isEmpty()) {
+      sb.append(hostAndPort).append(" ");
+    }
+    sb.append("histo").append(" ");
+
+    String prefix = "";
+    for(Map.Entry<String, Histogram> eachHistogram: histograms.entrySet()) {
+      String key = eachHistogram.getKey();
+      Histogram histogram = eachHistogram.getValue();
+      sb.append(prefix);
+      sb.append(key).append(".count=").append(histogram.getCount()).append("|");
+
+      Snapshot snapshot = histogram.getSnapshot();
+
+      sb.append(key).append(".min=").append(snapshot.getMin()).append("|");
+      sb.append(key).append(".max=").append(snapshot.getMax()).append("|");
+      sb.append(key).append(".mean=").append(String.format("%2.2f", snapshot.getMean())).append("|");
+      sb.append(key).append(".stddev=").append(String.format("%2.2f", snapshot.getStdDev())).append("|");
+      sb.append(key).append(".median=").append(String.format("%2.2f", snapshot.getMedian())).append("|");
+      sb.append(key).append(".75%=").append(String.format("%2.2f", snapshot.get75thPercentile())).append("|");
+      sb.append(key).append(".95%=").append(String.format("%2.2f", snapshot.get95thPercentile())).append("|");
+      sb.append(key).append(".98%=").append(String.format("%2.2f", snapshot.get98thPercentile())).append("|");
+      sb.append(key).append(".99%=").append(String.format("%2.2f", snapshot.get99thPercentile())).append("|");
+      sb.append(key).append(".999%=").append(String.format("%2.2f", snapshot.get999thPercentile()));
+      prefix = ",";
+    }
+    return sb.toString();
+  }
+
+  protected String timerGroupToString(String dateTime, String hostAndPort, double rateFactor,
+                                 String groupName, Map<String, Timer> timers) {
+    StringBuilder sb = new StringBuilder();
+
+    sb.append(dateTime).append(" ");
+    if(hostAndPort != null && !hostAndPort.isEmpty()) {
+      sb.append(hostAndPort).append(" ");
+    }
+    sb.append("timer").append(" ");
+
+    if(!groupName.isEmpty()) {
+      sb.append(groupName).append(" ");
+    }
+    String prefix = "";
+    for(Map.Entry<String, Timer> eachTimer: timers.entrySet()) {
+      String key = eachTimer.getKey();
+      Timer timer = eachTimer.getValue();
+      Snapshot snapshot = timer.getSnapshot();
+
+      sb.append(prefix);
+      sb.append(key).append(".count=").append(timer.getCount()).append("|");
+      sb.append(key).append(".meanrate=").append(String.format("%2.2f", convertRate(timer.getMeanRate(), rateFactor))).append("|");
+      sb.append(key).append(".1minuterate=").append(String.format("%2.2f", convertRate(timer.getOneMinuteRate(), rateFactor))).append("|");
+      sb.append(key).append(".5minuterate=").append(String.format("%2.2f", convertRate(timer.getFiveMinuteRate(), rateFactor))).append("|");
+      sb.append(key).append(".15minuterate=").append(String.format("%2.2f", convertRate(timer.getFifteenMinuteRate(),rateFactor))).append("|");
+      sb.append(key).append(".min=").append(String.format("%2.2f", convertRate(snapshot.getMin(), rateFactor))).append("|");
+      sb.append(key).append(".max=").append(String.format("%2.2f", convertRate(snapshot.getMax(),rateFactor))).append("|");
+      sb.append(key).append(".mean=").append(String.format("%2.2f", convertRate(snapshot.getMean(), rateFactor))).append("|");
+      sb.append(key).append(".stddev=").append(String.format("%2.2f", convertRate(snapshot.getStdDev(),rateFactor))).append("|");
+      sb.append(key).append(".median=").append(String.format("%2.2f", convertRate(snapshot.getMedian(), rateFactor))).append("|");
+      sb.append(key).append(".75%=").append(String.format("%2.2f", convertRate(snapshot.get75thPercentile(), rateFactor))).append("|");
+      sb.append(key).append(".95%=").append(String.format("%2.2f", convertRate(snapshot.get95thPercentile(), rateFactor))).append("|");
+      sb.append(key).append(".98%=").append(String.format("%2.2f", convertRate(snapshot.get98thPercentile(), rateFactor))).append("|");
+      sb.append(key).append(".99%=").append(String.format("%2.2f", convertRate(snapshot.get99thPercentile(), rateFactor))).append("|");
+      sb.append(key).append(".999%=").append(String.format("%2.2f", convertRate(snapshot.get999thPercentile(),rateFactor)));
+      prefix = ",";
+    }
+
+    return sb.toString();
+  }
+
+  protected double convertRate(double rate, double rateFactor) {
+    return rate * rateFactor;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
new file mode 100644
index 0000000..f11d520
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.metrics.GroupNameMetricsFilter;
+import org.apache.tajo.util.metrics.MetricsFilterList;
+import org.apache.tajo.util.metrics.RegexpMetricsFilter;
+
+import java.io.Closeable;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class TajoMetricsScheduledReporter extends TajoMetricsReporter implements Closeable {
+  private static final Log LOG = LogFactory.getLog(TajoMetricsScheduledReporter.class);
+
+  public static final String PERIOD_KEY = "period";
+
+  protected MetricRegistry registry;
+  protected ScheduledExecutorService executor;
+  protected MetricFilter filter;
+  protected double durationFactor;
+  protected String durationUnit;
+  protected double rateFactor;
+  protected String rateUnit;
+  protected Map<String, String> metricsProperties;
+  protected String metricsName;
+  protected String metricsPropertyKey;
+  protected String hostAndPort;
+  protected long period;
+
+  protected abstract String getReporterName();
+  protected abstract void afterInit();
+
+  private static class NamedThreadFactory implements ThreadFactory {
+    private final ThreadGroup group;
+    private final AtomicInteger threadNumber = new AtomicInteger(1);
+    private final String namePrefix;
+
+    private NamedThreadFactory(String name) {
+      final SecurityManager s = System.getSecurityManager();
+      this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+      this.namePrefix = "metrics-" + name + "-thread-";
+    }
+
+    @Override
+    public Thread newThread(Runnable r) {
+      final Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
+      t.setDaemon(true);
+      if (t.getPriority() != Thread.NORM_PRIORITY) {
+        t.setPriority(Thread.NORM_PRIORITY);
+      }
+      return t;
+    }
+  }
+
+  public long getPeriod() {
+    return period;
+  }
+
+  public void init(MetricRegistry registry,
+                   String metricsName,
+                   String hostAndPort,
+                   Map<String, String> metricsProperties) {
+    this.registry = registry;
+    this.metricsName = metricsName;
+    this.executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(metricsName));
+    this.rateFactor = TimeUnit.SECONDS.toSeconds(1);
+    this.rateUnit = calculateRateUnit(TimeUnit.MILLISECONDS);
+    this.durationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1);
+    this.durationUnit = TimeUnit.MILLISECONDS.toString().toLowerCase(Locale.US);
+    this.metricsProperties = metricsProperties;
+    this.metricsPropertyKey = metricsName + "." + getReporterName() + ".";
+    this.hostAndPort = hostAndPort;
+
+    MetricsFilterList filterList = new MetricsFilterList();
+    filterList.addMetricFilter(new GroupNameMetricsFilter(metricsName));
+
+    String regexpFilterKey = metricsPropertyKey + "regexp.";
+    Set<String> regexpExpressions = new HashSet<String>();
+
+    for(Map.Entry<String, String> entry: metricsProperties.entrySet()) {
+      String key = entry.getKey();
+      if(key.indexOf(regexpFilterKey) == 0) {
+        regexpExpressions.add(entry.getValue());
+      }
+    }
+
+    if(!regexpExpressions.isEmpty()) {
+      filterList.addMetricFilter(new RegexpMetricsFilter(regexpExpressions));
+    }
+    this.filter = filterList;
+
+    this.period = 60;
+    if(metricsProperties.get(metricsPropertyKey + PERIOD_KEY) != null) {
+      this.period = Integer.parseInt(metricsProperties.get(metricsPropertyKey + PERIOD_KEY));
+    }
+    afterInit();
+  }
+
+  public void start() {
+    start(period, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Starts the reporter polling at the given period.
+   *
+   * @param period the amount of time between polls
+   * @param unit   the unit for {@code period}
+   */
+  public void start(long period, TimeUnit unit) {
+    this.period = period;
+    executor.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          report();
+        } catch (Exception e) {
+          if(LOG.isDebugEnabled()) {
+            LOG.warn("Metric report error:" + e.getMessage(), e);
+          } else {
+            LOG.warn("Metric report error:" + e.getMessage(), e);
+          }
+        }
+      }
+    }, period, period, unit);
+  }
+
+  /**
+   * Stops the reporter and shuts down its thread of execution.
+   */
+  public void stop() {
+    executor.shutdown();
+    try {
+      executor.awaitTermination(1, TimeUnit.SECONDS);
+    } catch (InterruptedException ignored) {
+      // do nothing
+    }
+  }
+
+  /**
+   * Stops the reporter and shuts down its thread of execution.
+   */
+  @Override
+  public void close() {
+    stop();
+  }
+
+  /**
+   * Report the current values of all metrics in the registry.
+   */
+  public void report() {
+    report(registry.getGauges(filter),
+        registry.getCounters(filter),
+        registry.getHistograms(filter),
+        registry.getMeters(filter),
+        registry.getTimers(filter));
+  }
+
+  protected String getRateUnit() {
+    return rateUnit;
+  }
+
+  protected String getDurationUnit() {
+    return durationUnit;
+  }
+
+  protected double convertDuration(double duration) {
+    return duration * durationFactor;
+  }
+
+  protected double convertRate(double rate) {
+    return rate * rateFactor;
+  }
+
+  private String calculateRateUnit(TimeUnit unit) {
+    final String s = unit.toString().toLowerCase(Locale.US);
+    return s.substring(0, s.length() - 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/webapp/HttpServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/HttpServer.java b/tajo-core/src/main/java/org/apache/tajo/webapp/HttpServer.java
new file mode 100644
index 0000000..60faef2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/webapp/HttpServer.java
@@ -0,0 +1,447 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.webapp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.mortbay.jetty.Connector;
+import org.mortbay.jetty.Handler;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.SessionIdManager;
+import org.mortbay.jetty.handler.ContextHandlerCollection;
+import org.mortbay.jetty.nio.SelectChannelConnector;
+import org.mortbay.jetty.servlet.*;
+import org.mortbay.jetty.webapp.WebAppContext;
+import org.mortbay.thread.QueuedThreadPool;
+import org.mortbay.util.MultiException;
+
+import javax.servlet.http.HttpServlet;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.BindException;
+import java.net.URL;
+import java.util.*;
+
+/**
+ * This class is borrowed from Hadoop and is simplified to our objective.
+ */
+public class HttpServer {
+  private static final Log LOG = LogFactory.getLog(HttpServer.class);
+
+  protected final Server webServer;
+  protected final Connector listener;
+  protected final WebAppContext webAppContext;
+  protected final boolean findPort;
+  protected final Map<Context, Boolean> defaultContexts = 
+      new HashMap<Context, Boolean>();
+  protected final List<String> filterNames = new ArrayList<String>();
+  private static final int MAX_RETRIES = 10;
+  private final boolean listenerStartedExternally;
+  static final String STATE_DESCRIPTION_ALIVE = " - alive";
+  static final String STATE_DESCRIPTION_NOT_LIVE = " - not live";
+
+  public HttpServer(String name, String bindAddress, int port,
+      boolean findPort, Connector connector, Configuration conf,
+      String[] pathSpecs) throws IOException {
+    this.webServer = new Server();
+    this.findPort = findPort;
+
+    if (connector == null) {
+      listenerStartedExternally = false;
+      listener = createBaseListener(conf);
+      listener.setHost(bindAddress);
+      listener.setPort(port);
+
+    } else {
+      listenerStartedExternally = true;
+      listener = connector;
+    }
+    webServer.addConnector(listener);
+
+    SessionIdManager sessionIdManager = new HashSessionIdManager(new Random(System.currentTimeMillis()));
+    webServer.setSessionIdManager(sessionIdManager);
+
+    int maxThreads = conf.getInt("tajo.http.maxthreads", -1);
+    // If HTTP_MAX_THREADS is not configured, QueueThreadPool() will use the
+    // default value (currently 250).
+    QueuedThreadPool threadPool = maxThreads == -1 ? new QueuedThreadPool()
+        : new QueuedThreadPool(maxThreads);
+    webServer.setThreadPool(threadPool);
+
+    final String appDir = getWebAppsPath(name);
+    ContextHandlerCollection contexts = new ContextHandlerCollection();
+
+    webAppContext = new WebAppContext();
+    webAppContext.setDisplayName(name);
+    webAppContext.setContextPath("/");
+    webAppContext.setResourceBase(appDir + "/" + name);
+    webAppContext.setDescriptor(appDir + "/" + name + "/WEB-INF/web.xml");
+
+    contexts.addHandler(webAppContext);
+    webServer.setHandler(contexts);
+
+    addDefaultApps(contexts, appDir, conf);
+  }
+
+  /**
+   * Create a required listener for the Jetty instance listening on the port
+   * provided. This wrapper and all subclasses must create at least one
+   * listener.
+   */
+  public Connector createBaseListener(Configuration conf) throws IOException {
+    return HttpServer.createDefaultChannelConnector();
+  }
+
+  static Connector createDefaultChannelConnector() {
+    SelectChannelConnector ret = new SelectChannelConnector();
+    ret.setLowResourceMaxIdleTime(10000);
+    ret.setAcceptQueueSize(128);
+    ret.setResolveNames(false);
+    ret.setUseDirectBuffers(false);
+    return ret;
+  }
+
+  /**
+   * Add default apps.
+   * 
+   * @param appDir
+   *          The application directory
+   * @throws IOException
+   */
+  protected void addDefaultApps(ContextHandlerCollection parent,
+      final String appDir, Configuration conf) throws IOException {
+    // set up the context for "/logs/" if "hadoop.log.dir" property is defined.
+    String logDir = System.getProperty("tajo.log.dir");
+    if (logDir != null) {
+      Context logContext = new Context(parent, "/logs");
+      logContext.setResourceBase(logDir);
+      //logContext.addServlet(AdminAuthorizedServlet.class, "/*");
+      logContext.setDisplayName("logs");
+      defaultContexts.put(logContext, true);
+    }
+    // set up the context for "/static/*"
+    Context staticContext = new Context(parent, "/static");
+    staticContext.setResourceBase(appDir + "/static");
+    staticContext.addServlet(DefaultServlet.class, "/*");
+    staticContext.setDisplayName("static");
+    defaultContexts.put(staticContext, true);
+  }
+  
+  public void addContext(Context ctxt, boolean isFiltered)
+      throws IOException {
+    webServer.addHandler(ctxt);
+    defaultContexts.put(ctxt, isFiltered);
+  }
+  
+  /**
+   * Add a context 
+   * @param pathSpec The path spec for the context
+   * @param dir The directory containing the context
+   * @param isFiltered if true, the servlet is added to the filter path mapping 
+   * @throws IOException
+   */
+  protected void addContext(String pathSpec, String dir, boolean isFiltered) throws IOException {
+    if (0 == webServer.getHandlers().length) {
+      throw new RuntimeException("Couldn't find handler");
+    }
+    WebAppContext webAppCtx = new WebAppContext();
+    webAppCtx.setContextPath(pathSpec);
+    webAppCtx.setWar(dir);
+    addContext(webAppCtx, true);
+  }
+  
+  /**
+   * Set a value in the webapp context. These values are available to the jsp
+   * pages as "application.getAttribute(name)".
+   * @param name The name of the attribute
+   * @param value The value of the attribute
+   */
+  public void setAttribute(String name, Object value) {
+    webAppContext.setAttribute(name, value);
+  }
+  
+  /**
+   * Add a servlet in the server.
+   * @param name The name of the servlet (can be passed as null)
+   * @param pathSpec The path spec for the servlet
+   * @param clazz The servlet class
+   */
+  public void addServlet(String name, String pathSpec,
+      Class<? extends HttpServlet> clazz) {
+    addInternalServlet(name, pathSpec, clazz, false);
+    addFilterPathMapping(pathSpec, webAppContext);
+  }
+  
+  /**
+   * Add an internal servlet in the server, specifying whether or not to
+   * protect with Kerberos authentication. 
+   * Note: This method is to be used for adding servlets that facilitate
+   * internal communication and not for user facing functionality. For
+   * servlets added using this method, filters (except internal Kerberized
+   * filters) are not enabled. 
+   * 
+   * @param name The name of the servlet (can be passed as null)
+   * @param pathSpec The path spec for the servlet
+   * @param clazz The servlet class
+   */
+  public void addInternalServlet(String name, String pathSpec, 
+      Class<? extends HttpServlet> clazz, boolean requireAuth) {
+    ServletHolder holder = new ServletHolder(clazz);
+    if (name != null) {
+      holder.setName(name);
+    }
+    webAppContext.addServlet(holder, pathSpec);
+    
+    if(requireAuth && UserGroupInformation.isSecurityEnabled()) {
+       LOG.info("Adding Kerberos filter to " + name);
+       ServletHandler handler = webAppContext.getServletHandler();
+       FilterMapping fmap = new FilterMapping();
+       fmap.setPathSpec(pathSpec);
+       fmap.setFilterName("krb5Filter");
+       fmap.setDispatches(Handler.ALL);
+       handler.addFilterMapping(fmap);
+    }
+  }
+  
+  /**
+   * Add the path spec to the filter path mapping.
+   * @param pathSpec The path spec
+   * @param webAppCtx The WebApplicationContext to add to
+   */
+  protected void addFilterPathMapping(String pathSpec,
+      Context webAppCtx) {
+    ServletHandler handler = webAppCtx.getServletHandler();
+    for(String name : filterNames) {
+      FilterMapping fmap = new FilterMapping();
+      fmap.setPathSpec(pathSpec);
+      fmap.setFilterName(name);
+      fmap.setDispatches(Handler.ALL);
+      handler.addFilterMapping(fmap);
+    }
+  }
+  
+  protected String getWebAppsPath(String name) throws FileNotFoundException {
+    URL url = getClass().getClassLoader().getResource("webapps/" + name);
+    if (url == null) {
+      throw new FileNotFoundException("webapps/" + name + " not found in CLASSPATH");
+    }
+    String urlString = url.toString();
+    return urlString.substring(0, urlString.lastIndexOf('/'));
+  }
+  
+  /**
+   * Get the value in the webapp context.
+   * @param name The name of the attribute
+   * @return The value of the attribute
+   */
+  public Object getAttribute(String name) {
+    return webAppContext.getAttribute(name);
+  }  
+  
+  /**
+   * Get the port that the server is on
+   * @return the port
+   */
+  public int getPort() {
+    return webServer.getConnectors()[0].getLocalPort();
+  }
+
+  /**
+   * Set the min, max number of worker threads (simultaneous connections).
+   */
+  public void setThreads(int min, int max) {
+    QueuedThreadPool pool = (QueuedThreadPool) webServer.getThreadPool() ;
+    pool.setMinThreads(min);
+    pool.setMaxThreads(max);
+  }
+
+  /**
+   * Start the server. Does not wait for the server to start.
+   */
+  public void start() throws IOException {
+    try {
+      if (listenerStartedExternally) { // Expect that listener was started
+                                       // securely
+        if (listener.getLocalPort() == -1) // ... and verify
+          throw new Exception("Exepected webserver's listener to be started "
+              + "previously but wasn't");
+        // And skip all the port rolling issues.
+        webServer.start();
+      } else {
+        int port;
+        int oriPort = listener.getPort(); // The original requested port
+        while (true) {
+          try {
+            port = webServer.getConnectors()[0].getLocalPort();
+            LOG.debug("Port returned by webServer.getConnectors()[0]."
+                + "getLocalPort() before open() is " + port
+                + ". Opening the listener on " + oriPort);
+            listener.open();
+            port = listener.getLocalPort();
+            LOG.debug("listener.getLocalPort() returned "
+                + listener.getLocalPort()
+                + " webServer.getConnectors()[0].getLocalPort() returned "
+                + webServer.getConnectors()[0].getLocalPort());
+            // Workaround to handle the problem reported in HADOOP-4744
+            if (port < 0) {
+              Thread.sleep(100);
+              int numRetries = 1;
+              while (port < 0) {
+                LOG.warn("listener.getLocalPort returned " + port);
+                if (numRetries++ > MAX_RETRIES) {
+                  throw new Exception(" listener.getLocalPort is returning "
+                      + "less than 0 even after " + numRetries + " resets");
+                }
+                for (int i = 0; i < 2; i++) {
+                  LOG.info("Retrying listener.getLocalPort()");
+                  port = listener.getLocalPort();
+                  if (port > 0) {
+                    break;
+                  }
+                  Thread.sleep(200);
+                }
+                if (port > 0) {
+                  break;
+                }
+                LOG.info("Bouncing the listener");
+                listener.close();
+                Thread.sleep(1000);
+                listener.setPort(oriPort == 0 ? 0 : (oriPort += 1));
+                listener.open();
+                Thread.sleep(100);
+                port = listener.getLocalPort();
+              }
+            } // Workaround end
+            LOG.info("Jetty bound to port " + port);
+            webServer.start();
+            break;
+          } catch (IOException ex) {
+            // if this is a bind exception,
+            // then try the next port number.
+            if (ex instanceof BindException) {
+              if (!findPort) {
+                BindException be = new BindException("Port in use: "
+                    + listener.getHost() + ":" + listener.getPort());
+                be.initCause(ex);
+                throw be;
+              }
+            } else {
+              LOG.info("HttpServer.start() threw a non Bind IOException");
+              throw ex;
+            }
+          } catch (MultiException ex) {
+            LOG.info("HttpServer.start() threw a MultiException");
+            throw ex;
+          }
+          listener.setPort((oriPort += 1));
+        }
+      }
+      // Make sure there is no handler failures.
+      Handler[] handlers = webServer.getHandlers();
+      for (int i = 0; i < handlers.length; i++) {
+        if (handlers[i].isFailed()) {
+          throw new IOException(
+              "Problem in starting http server. Server handlers failed");
+        }
+      }
+    } catch (IOException e) {
+      throw e;
+    } catch (InterruptedException e) {
+      throw (IOException) new InterruptedIOException(
+          "Interrupted while starting HTTP server").initCause(e);
+    } catch (Exception e) {
+      throw new IOException("Problem starting http server", e);
+    }
+  }
+
+  /**
+   * stop the server
+   */
+  public void stop() throws Exception {
+    MultiException exception = null;
+    try {
+      listener.close();
+    } catch (Exception e) {
+      LOG.error(
+          "Error while stopping listener for webapp"
+              + webAppContext.getDisplayName(), e);
+      exception = addMultiException(exception, e);
+    }
+
+    try {
+      // clear & stop webAppContext attributes to avoid memory leaks.
+      webAppContext.clearAttributes();
+      webAppContext.stop();
+    } catch (Exception e) {
+      LOG.error("Error while stopping web app context for webapp "
+          + webAppContext.getDisplayName(), e);
+      exception = addMultiException(exception, e);
+    }
+    try {
+      webServer.stop();
+    } catch (Exception e) {
+      LOG.error(
+          "Error while stopping web server for webapp "
+              + webAppContext.getDisplayName(), e);
+      exception = addMultiException(exception, e);
+    }
+
+    if (exception != null) {
+      exception.ifExceptionThrow();
+    }
+
+  }
+
+  private MultiException addMultiException(MultiException exception, Exception e) {
+    if (exception == null) {
+      exception = new MultiException();
+    }
+    exception.add(e);
+    return exception;
+  }
+
+  public void join() throws InterruptedException {
+    webServer.join();
+  }
+
+  /**
+   * Test for the availability of the web server
+   * 
+   * @return true if the web server is started, false otherwise
+   */
+  public boolean isAlive() {
+    return webServer != null && webServer.isStarted();
+  }
+
+  /**
+   * Return the host and port of the HttpServer, if live
+   * 
+   * @return the classname and any HTTP URL
+   */
+  @Override
+  public String toString() {
+    return listener != null ? ("HttpServer at http://" + listener.getHost()
+        + ":" + listener.getLocalPort() + "/" + (isAlive() ? STATE_DESCRIPTION_ALIVE
+        : STATE_DESCRIPTION_NOT_LIVE))
+        : "Inactive HttpServer";
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
new file mode 100644
index 0000000..faeadaf
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
@@ -0,0 +1,376 @@
+package org.apache.tajo.webapp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.client.QueryStatus;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.jdbc.TajoResultSet;
+import org.apache.tajo.util.JSPUtil;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class QueryExecutorServlet extends HttpServlet {
+  private static final Log LOG = LogFactory.getLog(QueryExecutorServlet.class);
+
+  ObjectMapper om = new ObjectMapper();
+
+  //queryRunnerId -> QueryRunner
+  private final Map<String, QueryRunner> queryRunners = new HashMap<String, QueryRunner>();
+
+  private TajoClient tajoClient;
+
+  private ExecutorService queryRunnerExecutor = Executors.newFixedThreadPool(5);
+
+  private QueryRunnerCleaner queryRunnerCleaner;
+  @Override
+  public void init(ServletConfig config) throws ServletException {
+    om.getDeserializationConfig().disable(
+        DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES);
+
+    try {
+      tajoClient = new TajoClient(new TajoConf());
+
+      queryRunnerCleaner = new QueryRunnerCleaner();
+      queryRunnerCleaner.start();
+    } catch (IOException e) {
+      LOG.error(e.getMessage());
+    }
+  }
+
+  @Override
+  public void service(HttpServletRequest request,
+                      HttpServletResponse response) throws ServletException, IOException {
+    String action = request.getParameter("action");
+    Map<String, Object> returnValue = new HashMap<String, Object>();
+    try {
+      if(tajoClient == null) {
+        errorResponse(response, "TajoClient not initialized");
+        return;
+      }
+      if(action == null || action.trim().isEmpty()) {
+        errorResponse(response, "no action parameter.");
+        return;
+      }
+
+      if("runQuery".equals(action)) {
+        String query = request.getParameter("query");
+        if(query == null || query.trim().isEmpty()) {
+          errorResponse(response, "No query parameter");
+          return;
+        }
+        String queryRunnerId = null;
+        while(true) {
+          synchronized(queryRunners) {
+            queryRunnerId = "" + System.currentTimeMillis();
+            if(!queryRunners.containsKey(queryRunnerId)) {
+              break;
+            }
+            try {
+              Thread.sleep(100);
+            } catch (InterruptedException e) {
+            }
+          }
+        }
+        QueryRunner queryRunner = new QueryRunner(queryRunnerId, query);
+        try {
+          queryRunner.sizeLimit = Integer.parseInt(request.getParameter("limitSize"));
+        } catch (java.lang.NumberFormatException nfe) {
+          queryRunner.sizeLimit = 1048576;
+        }
+        synchronized(queryRunners) {
+          queryRunners.put(queryRunnerId, queryRunner);
+        }
+        queryRunnerExecutor.submit(queryRunner);
+        returnValue.put("queryRunnerId", queryRunnerId);
+      } else if("getQueryProgress".equals(action)) {
+        synchronized(queryRunners) {
+          String queryRunnerId = request.getParameter("queryRunnerId");
+          QueryRunner queryRunner = queryRunners.get(queryRunnerId);
+          if(queryRunner == null) {
+            errorResponse(response, "No query info:" + queryRunnerId);
+            return;
+          }
+          if(queryRunner.error != null) {
+            errorResponse(response, queryRunner.error);
+            return;
+          }
+          SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+          returnValue.put("progress", queryRunner.progress);
+          returnValue.put("startTime", df.format(queryRunner.startTime));
+          returnValue.put("finishTime", queryRunner.finishTime == 0 ? "-" : df.format(queryRunner.startTime));
+          returnValue.put("runningTime", JSPUtil.getElapsedTime(queryRunner.startTime, queryRunner.finishTime));
+        }
+      } else if("getQueryResult".equals(action)) {
+        synchronized(queryRunners) {
+          String queryRunnerId = request.getParameter("queryRunnerId");
+          QueryRunner queryRunner = queryRunners.get(queryRunnerId);
+          if(queryRunner == null) {
+            errorResponse(response, "No query info:" + queryRunnerId);
+            return;
+          }
+          if(queryRunner.error != null) {
+            errorResponse(response, queryRunner.error);
+            return;
+          }
+          returnValue.put("numOfRows", queryRunner.numOfRows);
+          returnValue.put("resultSize", queryRunner.resultSize);
+          returnValue.put("resultData", queryRunner.queryResult);
+          returnValue.put("resultColumns", queryRunner.columnNames);
+          returnValue.put("runningTime", JSPUtil.getElapsedTime(queryRunner.startTime, queryRunner.finishTime));
+        }
+      } else if("clearAllQueryRunner".equals(action)) {
+        synchronized(queryRunners) {
+          for(QueryRunner eachQueryRunner: queryRunners.values()) {
+            eachQueryRunner.setStop();
+          }
+          queryRunners.clear();
+        }
+      }
+      returnValue.put("success", "true");
+      writeHttpResponse(response, returnValue);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      errorResponse(response, e);
+    }
+  }
+
+  private void errorResponse(HttpServletResponse response, Exception e) throws IOException {
+    errorResponse(response, e.getMessage() + "\n" + StringUtils.stringifyException(e));
+  }
+
+  private void errorResponse(HttpServletResponse response, String message) throws IOException {
+    Map<String, Object> errorMessage = new HashMap<String, Object>();
+    errorMessage.put("success", "false");
+    errorMessage.put("errorMessage", message);
+    writeHttpResponse(response, errorMessage);
+  }
+
+  private void writeHttpResponse(HttpServletResponse response, Map<String, Object> outputMessage) throws IOException {
+    response.setContentType("text/html");
+
+    OutputStream out = response.getOutputStream();
+    out.write(om.writeValueAsBytes(outputMessage));
+
+    out.flush();
+    out.close();
+  }
+
+  class QueryRunnerCleaner extends Thread {
+    public void run() {
+      List<QueryRunner> queryRunnerList;
+      synchronized(queryRunners) {
+        queryRunnerList = new ArrayList<QueryRunner>(queryRunners.values());
+        for(QueryRunner eachQueryRunner: queryRunnerList) {
+          if(!eachQueryRunner.running.get() &&
+              (System.currentTimeMillis() - eachQueryRunner.finishTime > 180 * 1000)) {
+            queryRunners.remove(eachQueryRunner.queryRunnerId);
+          }
+        }
+      }
+    }
+  }
+
+  class QueryRunner extends Thread {
+    long startTime;
+    long finishTime;
+
+    String queryRunnerId;
+
+    ClientProtos.SubmitQueryResponse queryRespons;
+    AtomicBoolean running = new AtomicBoolean(true);
+    AtomicBoolean stop = new AtomicBoolean(false);
+    QueryId queryId;
+    String query;
+    long resultSize;
+    int sizeLimit;
+    long numOfRows;
+    Exception error;
+
+    AtomicInteger progress = new AtomicInteger(0);
+
+    List<String> columnNames = new ArrayList<String>();
+
+    List<List<Object>> queryResult;
+
+    public QueryRunner(String queryRunnerId, String query) {
+      this.queryRunnerId = queryRunnerId;
+      this.query = query;
+    }
+
+    public void setStop() {
+      this.stop.set(true);
+      this.interrupt();
+    }
+
+    public void run() {
+      startTime = System.currentTimeMillis();
+      try {
+        queryRespons = tajoClient.executeQuery(query);
+        if (queryRespons.getResultCode() == ClientProtos.ResultCode.OK) {
+          QueryId queryId = null;
+          try {
+            queryId = new QueryId(queryRespons.getQueryId());
+            getQueryResult(queryId);
+          } finally {
+            if (queryId != null) {
+              tajoClient.closeQuery(queryId);
+            }
+          }
+        } else {
+          LOG.error("queryRespons.getResultCode() not OK:" + queryRespons.getResultCode());
+          error = new Exception("queryRespons.getResultCode() not OK:" + queryRespons.getResultCode());
+        }
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+        error = e;
+      } finally {
+        running.set(false);
+        finishTime = System.currentTimeMillis();
+      }
+    }
+
+    private void getQueryResult(QueryId tajoQueryId) {
+      // query execute
+      try {
+        QueryStatus status = null;
+
+        while (!stop.get()) {
+          try {
+            Thread.sleep(1000);
+          } catch(InterruptedException e) {
+            break;
+          }
+          status = tajoClient.getQueryStatus(tajoQueryId);
+          if (status.getState() == TajoProtos.QueryState.QUERY_MASTER_INIT
+              || status.getState() == TajoProtos.QueryState.QUERY_MASTER_LAUNCHED) {
+            continue;
+          }
+
+          if (status.getState() == TajoProtos.QueryState.QUERY_RUNNING
+              || status.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+            int progressValue = (int) (status.getProgress() * 100.0f);
+            if(progressValue == 100)  {
+              progressValue = 99;
+            }
+            progress.set(progressValue);
+          }
+          if (status.getState() != TajoProtos.QueryState.QUERY_RUNNING
+              && status.getState() != TajoProtos.QueryState.QUERY_NOT_ASSIGNED) {
+            break;
+          }
+        }
+
+        if(status == null) {
+          LOG.error("Query Status is null");
+          error = new Exception("Query Status is null");
+          return;
+        }
+        if (status.getState() == TajoProtos.QueryState.QUERY_ERROR ||
+            status.getState() == TajoProtos.QueryState.QUERY_FAILED) {
+          error = new Exception(status.getErrorMessage());
+        } else if (status.getState() == TajoProtos.QueryState.QUERY_KILLED) {
+          LOG.info(queryId + " is killed.");
+          error = new Exception(queryId + " is killed.");
+        } else {
+          if (status.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+            if (status.hasResult()) {
+              ResultSet res = null;
+              try {
+                ClientProtos.GetQueryResultResponse response = tajoClient.getResultResponse(tajoQueryId);
+                TableDesc desc = CatalogUtil.newTableDesc(response.getTableDesc());
+                tajoClient.getConf().setVar(TajoConf.ConfVars.USERNAME, response.getTajoUserName());
+                res = new TajoResultSet(tajoClient, queryId, tajoClient.getConf(), desc);
+
+                ResultSetMetaData rsmd = res.getMetaData();
+                resultSize = desc.getStats().getNumBytes();
+                LOG.info("Tajo Query Result: " + desc.getPath() + "\n");
+
+                int numOfColumns = rsmd.getColumnCount();
+                for(int i = 0; i < numOfColumns; i++) {
+                  columnNames.add(rsmd.getColumnName(i + 1));
+                }
+                queryResult = new ArrayList<List<Object>>();
+
+                if(sizeLimit < resultSize) {
+                    numOfRows = (long)((float)(desc.getStats().getNumRows()) * ((float)sizeLimit / (float)resultSize));
+                } else {
+                    numOfRows = desc.getStats().getNumRows();
+                }
+                int rowCount = 0;
+                boolean hasMoreData = false;
+                while (res.next()) {
+                  if(rowCount > numOfRows) {
+                    hasMoreData = true;
+                    break;
+                  }
+                  List<Object> row = new ArrayList<Object>();
+                  for(int i = 0; i < numOfColumns; i++) {
+                    row.add(res.getObject(i + 1).toString());
+                  }
+                  queryResult.add(row);
+                  rowCount++;
+
+                }
+              } finally {
+                if (res != null) {
+                  res.close();
+                }
+                progress.set(100);
+              }
+            } else {
+              error = new Exception(queryId + " no result");
+            }
+          }
+        }
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+        error = e;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java b/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
new file mode 100644
index 0000000..09426e0
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.webapp;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.worker.TajoWorker;
+import org.mortbay.jetty.Connector;
+
+import java.io.IOException;
+import java.net.Inet4Address;
+
+public class StaticHttpServer extends HttpServer {
+  private static StaticHttpServer instance = null;
+
+  private StaticHttpServer(Object containerObject , String name, String bindAddress, int port,
+      boolean findPort, Connector connector, Configuration conf,
+      String[] pathSpecs) throws IOException {
+    super( name, bindAddress, port, findPort, connector, conf, pathSpecs);
+  }
+  public static StaticHttpServer getInstance() {
+    return instance;
+  }
+  public static StaticHttpServer getInstance(Object containerObject, String name,
+      String bindAddress, int port, boolean findPort, Connector connector,
+      TajoConf conf,
+      String[] pathSpecs) throws IOException {
+    String addr = bindAddress;
+    if(instance == null) {
+      if(bindAddress == null || bindAddress.compareTo("") == 0) {
+        if (containerObject instanceof TajoMaster) {
+          addr = conf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS).split(":")[0];
+        } else if (containerObject instanceof TajoWorker) {
+          addr = Inet4Address.getLocalHost().getHostName();
+        }
+      }
+      
+      instance = new StaticHttpServer(containerObject, name, addr, port,
+          findPort, connector, conf, pathSpecs);
+      instance.setAttribute("tajo.info.server.object", containerObject);
+      instance.setAttribute("tajo.info.server.addr", addr);
+      instance.setAttribute("tajo.info.server.conf", conf);
+      instance.setAttribute("tajo.info.server.starttime", System.currentTimeMillis());
+    }
+    return instance;
+  }
+
+  public void set(String name, Object object) {
+    instance.setAttribute(name, object);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
new file mode 100644
index 0000000..55aa8c4
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tajo.master.ContainerProxy;
+
+import java.util.Map;
+
+public abstract class AbstractResourceAllocator extends CompositeService implements ResourceAllocator {
+  private Map<ContainerId, ContainerProxy> containers = Maps.newConcurrentMap();
+
+  public AbstractResourceAllocator() {
+    super(AbstractResourceAllocator.class.getName());
+  }
+
+  public void addContainer(ContainerId cId, ContainerProxy container) {
+    containers.put(cId, container);
+  }
+
+  public void removeContainer(ContainerId cId) {
+    containers.remove(cId);
+  }
+
+  public boolean containsContainer(ContainerId cId) {
+    return containers.containsKey(cId);
+  }
+
+  public ContainerProxy getContainer(ContainerId cId) {
+    return containers.get(cId);
+  }
+
+  public Map<ContainerId, ContainerProxy> getContainers() {
+    return containers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/DeletionService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/DeletionService.java b/tajo-core/src/main/java/org/apache/tajo/worker/DeletionService.java
new file mode 100644
index 0000000..42ea71f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/DeletionService.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+
+import java.io.IOException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+public class DeletionService {
+  static final Log LOG = LogFactory.getLog(DeletionService.class);
+
+  private int debugDelay;
+  private ScheduledThreadPoolExecutor sched;
+  private final FileContext lfs = getLfs();
+
+  static final FileContext getLfs() {
+    try {
+      return FileContext.getLocalFSFileContext();
+    } catch (UnsupportedFileSystemException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public DeletionService(int defaultThreads, int debugDelay) {
+    ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat("DeletionService #%d").build();
+
+    sched = new ScheduledThreadPoolExecutor(defaultThreads, tf);
+    sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+    sched.setKeepAliveTime(60L, TimeUnit.SECONDS);
+    this.debugDelay = debugDelay;
+  }
+
+
+  /**
+   * /**
+   * Delete the path(s) as this user.
+   *
+   * @param subDir   the sub directory name
+   * @param baseDirs the base directories which contains the subDir's
+   */
+  public void delete(Path subDir, Path... baseDirs) {
+    if (debugDelay != -1) {
+      sched.schedule(new FileDeletion(subDir, baseDirs), debugDelay, TimeUnit.SECONDS);
+    }
+  }
+
+  public void stop() {
+    sched.shutdown();
+    boolean terminated = false;
+    try {
+      terminated = sched.awaitTermination(10, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+    }
+    if (!terminated) {
+      sched.shutdownNow();
+    }
+  }
+
+  private class FileDeletion implements Runnable {
+    final Path subDir;
+    final Path[] baseDirs;
+
+    FileDeletion(Path subDir, Path[] baseDirs) {
+      this.subDir = subDir;
+      this.baseDirs = baseDirs;
+    }
+
+    @Override
+    public void run() {
+
+      if (baseDirs == null || baseDirs.length == 0) {
+        LOG.debug("Worker deleting absolute path : " + subDir);
+        try {
+          lfs.delete(subDir, true);
+        } catch (IOException e) {
+          LOG.warn("Failed to delete " + subDir);
+        }
+        return;
+      }
+      for (Path baseDir : baseDirs) {
+        Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
+        LOG.debug("Worker deleting path : " + del);
+        try {
+          lfs.delete(del, true);
+        } catch (IOException e) {
+          LOG.warn("Failed to delete " + subDir);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
new file mode 100644
index 0000000..bb136f7
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.*;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.channels.FileChannel;
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+/**
+ * Fetcher fetches data from a given uri via HTTP protocol and stores them into
+ * a specific file. It aims at asynchronous and efficient data transmit.
+ */
+public class Fetcher {
+  private final static Log LOG = LogFactory.getLog(Fetcher.class);
+
+  private final URI uri;
+  private final File file;
+
+  private final String host;
+  private int port;
+
+  private long startTime;
+  private long finishTime;
+  private long fileLen;
+  private int messageReceiveCount;
+
+  private ClientBootstrap bootstrap;
+
+  public Fetcher(URI uri, File file, ClientSocketChannelFactory factory) {
+    this.uri = uri;
+    this.file = file;
+
+    String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
+    this.host = uri.getHost() == null ? "localhost" : uri.getHost();
+    this.port = uri.getPort();
+    if (port == -1) {
+      if (scheme.equalsIgnoreCase("http")) {
+        this.port = 80;
+      } else if (scheme.equalsIgnoreCase("https")) {
+        this.port = 443;
+      }
+    }
+
+    bootstrap = new ClientBootstrap(factory);
+    bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec
+    bootstrap.setOption("receiveBufferSize", 1048576); // set 1M
+    bootstrap.setOption("tcpNoDelay", true);
+
+    ChannelPipelineFactory pipelineFactory = new HttpClientPipelineFactory(file);
+    bootstrap.setPipelineFactory(pipelineFactory);
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  public long getFileLen() {
+    return fileLen;
+  }
+
+  public int getMessageReceiveCount() {
+    return messageReceiveCount;
+  }
+
+  public String getStatus() {
+    if(startTime == 0) {
+      return "READY";
+    }
+
+    if(startTime > 0 && finishTime == 0) {
+      return "FETCHING";
+    } else {
+      return "FINISH";
+    }
+  }
+
+  public File get() throws IOException {
+    startTime = System.currentTimeMillis();
+
+    ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
+
+    // Wait until the connection attempt succeeds or fails.
+    Channel channel = future.awaitUninterruptibly().getChannel();
+    if (!future.isSuccess()) {
+      future.getChannel().close();
+      throw new IOException(future.getCause());
+    }
+
+    String query = uri.getPath()
+        + (uri.getRawQuery() != null ? "?" + uri.getRawQuery() : "");
+    // Prepare the HTTP request.
+    HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, query);
+    request.setHeader(HttpHeaders.Names.HOST, host);
+    LOG.info("Fetch: " + uri);
+    request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+    request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
+
+
+    // Send the HTTP request.
+    ChannelFuture channelFuture = channel.write(request);
+
+    // Wait for the server to close the connection.
+    channel.getCloseFuture().awaitUninterruptibly();
+
+    channelFuture.addListener(ChannelFutureListener.CLOSE);
+
+    // Close the channel to exit.
+    future.getChannel().close();
+    finishTime = System.currentTimeMillis();
+    return file;
+  }
+
+  public URI getURI() {
+    return this.uri;
+  }
+
+  class HttpClientHandler extends SimpleChannelUpstreamHandler {
+    private volatile boolean readingChunks;
+    private final File file;
+    private RandomAccessFile raf;
+    private FileChannel fc;
+    private long length = -1;
+
+    public HttpClientHandler(File file) throws FileNotFoundException {
+      this.file = file;
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        throws Exception {
+      messageReceiveCount++;
+      try {
+        if (!readingChunks) {
+          HttpResponse response = (HttpResponse) e.getMessage();
+
+          StringBuilder sb = new StringBuilder();
+          if (LOG.isDebugEnabled()) {
+            sb.append("STATUS: ").append(response.getStatus())
+                .append(", VERSION: ").append(response.getProtocolVersion())
+                .append(", HEADER: ");
+          }
+          if (!response.getHeaderNames().isEmpty()) {
+            for (String name : response.getHeaderNames()) {
+              for (String value : response.getHeaders(name)) {
+                if (LOG.isDebugEnabled()) {
+                  sb.append(name).append(" = ").append(value);
+                }
+                if (this.length == -1 && name.equals("Content-Length")) {
+                  this.length = Long.valueOf(value);
+                }
+              }
+            }
+          }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(sb.toString());
+          }
+
+          if (response.getStatus() == HttpResponseStatus.NO_CONTENT) {
+            LOG.info("There are no data corresponding to the request");
+            return;
+          }
+
+          this.raf = new RandomAccessFile(file, "rw");
+          this.fc = raf.getChannel();
+
+          if (response.isChunked()) {
+            readingChunks = true;
+          } else {
+            ChannelBuffer content = response.getContent();
+            if (content.readable()) {
+              fc.write(content.toByteBuffer());
+            }
+          }
+        } else {
+          HttpChunk chunk = (HttpChunk) e.getMessage();
+          if (chunk.isLast()) {
+            readingChunks = false;
+            long fileLength = file.length();
+            if (fileLength == length) {
+              LOG.info("Data fetch is done (total received bytes: " + fileLength
+                  + ")");
+            } else {
+              LOG.info("Data fetch is done, but cannot get all data "
+                  + "(received/total: " + fileLength + "/" + length + ")");
+            }
+          } else {
+            fc.write(chunk.getContent().toByteBuffer());
+          }
+        }
+      } finally {
+        if(raf != null) {
+          fileLen = file.length();
+        }
+
+        if(fileLen >= length){
+          IOUtils.cleanup(LOG, fc, raf);
+
+        }
+      }
+    }
+  }
+
+  class HttpClientPipelineFactory implements
+      ChannelPipelineFactory {
+    private final File file;
+
+    public HttpClientPipelineFactory(File file) {
+      this.file = file;
+    }
+
+    @Override
+    public ChannelPipeline getPipeline() throws Exception {
+      ChannelPipeline pipeline = pipeline();
+
+      pipeline.addLast("codec", new HttpClientCodec());
+      pipeline.addLast("inflater", new HttpContentDecompressor());
+      pipeline.addLast("handler", new HttpClientHandler(file));
+      return pipeline;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
new file mode 100644
index 0000000..42ad875
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.worker.dataserver.FileAccessForbiddenException;
+import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
+import org.apache.tajo.worker.dataserver.retriever.FileChunk;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+@Deprecated
+public class InterDataRetriever implements DataRetriever {
+  private final Log LOG = LogFactory.getLog(InterDataRetriever.class);
+  private final Set<QueryUnitId> registered = Sets.newHashSet();
+  private final Map<String, String> map = Maps.newConcurrentMap();
+
+  public InterDataRetriever() {
+  }
+  
+  public void register(QueryUnitId id, String baseURI) {
+    synchronized (registered) {
+      if (!registered.contains(id)) {      
+        map.put(id.toString(), baseURI);
+        registered.add(id);      
+      }
+    } 
+  }
+  
+  public void unregister(QueryUnitId id) {
+    synchronized (registered) {
+      if (registered.contains(id)) {
+        map.remove(id.toString());
+        registered.remove(id);
+      }
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.tajo.worker.dataserver.retriever.DataRetriever#handle(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.handler.codec.http.HttpRequest)
+   */
+  @Override
+  public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
+      throws IOException {
+       
+    int start = request.getUri().indexOf('?');
+    if (start < 0) {
+      throw new IllegalArgumentException("Wrong request: " + request.getUri());
+    }
+    
+    String queryStr = request.getUri().substring(start + 1);
+    LOG.info("QUERY: " + queryStr);
+    String [] queries = queryStr.split("&");
+    
+    String qid = null;
+    String fn = null;
+    String [] kv;
+    for (String query : queries) {
+      kv = query.split("=");
+      if (kv[0].equals("qid")) {
+        qid = kv[1];
+      } else if (kv[0].equals("fn")) {
+        fn = kv[1];
+      }
+    }
+    
+    String baseDir = map.get(qid);
+    if (baseDir == null) {
+      throw new FileNotFoundException("No such qid: " + qid);
+    }
+
+    File file = new File(baseDir + "/" + fn);
+    if (file.isHidden() || !file.exists()) {
+      throw new FileNotFoundException("No such file: " + baseDir + "/" 
+          + file.getName());
+    }
+    if (!file.isFile()) {
+      throw new FileAccessForbiddenException("No such file: " 
+          + baseDir + "/" + file.getName()); 
+    }
+    
+    return new FileChunk[] {new FileChunk(file, 0, file.length())};
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java
new file mode 100644
index 0000000..36e7353
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.worker.dataserver.retriever.FileChunk;
+import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class PartitionRetrieverHandler implements RetrieverHandler {
+  private final String baseDir;
+
+  public PartitionRetrieverHandler(String baseDir) {
+    this.baseDir = baseDir;
+  }
+
+  @Override
+  public FileChunk get(Map<String, List<String>> kvs) throws IOException {
+    // nothing to verify the file because AdvancedDataRetriever checks
+    // its validity of the file.
+    File file = new File(baseDir + "/" + kvs.get("fn").get(0));
+
+    return new FileChunk(file, 0, file.length());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
new file mode 100644
index 0000000..be33a12
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.worker.dataserver.retriever.FileChunk;
+import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ * It retrieves the file chunk ranged between start and end keys.
+ * The start key is inclusive, but the end key is exclusive.
+ *
+ * Internally, there are four cases:
+ * <ul>
+ *   <li>out of scope: the index range does not overlapped with the query range.</li>
+ *   <li>overlapped: the index range is partially overlapped with the query range. </li>
+ *   <li>included: the index range is included in the start and end keys</li>
+ *   <li>covered: the index range covers the query range (i.e., start and end keys).</li>
+ * </ul>
+ */
+public class RangeRetrieverHandler implements RetrieverHandler {
+  private static final Log LOG = LogFactory.getLog(RangeRetrieverHandler.class);
+  private final File file;
+  private final BSTIndex.BSTIndexReader idxReader;
+  private final Schema schema;
+  private final TupleComparator comp;
+  private final RowStoreDecoder decoder;
+
+  public RangeRetrieverHandler(File outDir, Schema schema, TupleComparator comp) throws IOException {
+    this.file = outDir;
+    BSTIndex index = new BSTIndex(new TajoConf());
+    this.schema = schema;
+    this.comp = comp;
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    Path indexPath = fs.makeQualified(new Path(outDir.getCanonicalPath(), "index"));
+    this.idxReader =
+        index.getIndexReader(indexPath, this.schema, this.comp);
+    this.idxReader.open();
+    LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", "
+        + idxReader.getLastKey());
+    this.decoder = RowStoreUtil.createDecoder(schema);
+  }
+
+  @Override
+  public FileChunk get(Map<String, List<String>> kvs) throws IOException {
+    // nothing to verify the file because AdvancedDataRetriever checks
+    // its validity of the file.
+    File data = new File(this.file, "data/data");
+    byte [] startBytes = Base64.decodeBase64(kvs.get("start").get(0));
+    Tuple start = decoder.toTuple(startBytes);
+    byte [] endBytes;
+    Tuple end;
+    endBytes = Base64.decodeBase64(kvs.get("end").get(0));
+    end = decoder.toTuple(endBytes);
+    boolean last = kvs.containsKey("final");
+
+    if(!comp.isAscendingFirstKey()) {
+      Tuple tmpKey = start;
+      start = end;
+      end = tmpKey;
+    }
+
+    LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end +
+        (last ? ", last=true" : "") + ")");
+
+    if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero
+      LOG.info("There is no contents");
+      return null;
+    }
+
+    if (comp.compare(end, idxReader.getFirstKey()) < 0 ||
+        comp.compare(idxReader.getLastKey(), start) < 0) {
+      LOG.info("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
+          "], but request start:" + start + ", end: " + end);
+      return null;
+    }
+
+    long startOffset;
+    long endOffset;
+    try {
+      startOffset = idxReader.find(start);
+    } catch (IOException ioe) {
+      LOG.error("State Dump (the requested range: "
+          + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+          + idxReader.getLastKey());
+      throw ioe;
+    }
+    try {
+      endOffset = idxReader.find(end);
+      if (endOffset == -1) {
+        endOffset = idxReader.find(end, true);
+      }
+    } catch (IOException ioe) {
+      LOG.error("State Dump (the requested range: "
+          + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+          + idxReader.getLastKey());
+      throw ioe;
+    }
+
+    // if startOffset == -1 then case 2-1 or case 3
+    if (startOffset == -1) { // this is a hack
+      // if case 2-1 or case 3
+      try {
+        startOffset = idxReader.find(start, true);
+      } catch (IOException ioe) {
+        LOG.error("State Dump (the requested range: "
+            + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+            + idxReader.getLastKey());
+        throw ioe;
+      }
+    }
+
+    if (startOffset == -1) {
+      throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
+          "State Dump (the requested range: "
+          + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+          + idxReader.getLastKey());
+    }
+
+    // if greater than indexed values
+    if (last || (endOffset == -1 && comp.compare(idxReader.getLastKey(), end) < 0)) {
+      endOffset = data.length();
+    }
+
+    FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
+    LOG.info("Retrieve File Chunk: " + chunk);
+    return chunk;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
new file mode 100644
index 0000000..8b9219c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+
+public interface ResourceAllocator {
+  public void allocateTaskWorker();
+  public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerId);
+  public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext,
+                                           int numTasks, int memoryMBPerTask);
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
new file mode 100644
index 0000000..1731854
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.StorageManagerFactory;
+
+import java.io.IOException;
+
+public class TajoQueryEngine {
+
+  private final AbstractStorageManager storageManager;
+  private final PhysicalPlanner phyPlanner;
+
+  public TajoQueryEngine(TajoConf conf) throws IOException {
+    this.storageManager = StorageManagerFactory.getStorageManager(conf);
+    this.phyPlanner = new PhysicalPlannerImpl(conf, storageManager);
+  }
+  
+  public PhysicalExec createPlan(TaskAttemptContext ctx, LogicalNode plan)
+      throws InternalException {
+    return phyPlanner.createPlan(ctx, plan);
+  }
+  
+  public void stop() throws IOException {
+  }
+}