You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 13:44:22 UTC
[19/57] [abbrv] [partial] TAJO-752: Escalate sub modules in tajo-core
into the top-level modules. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java
new file mode 100644
index 0000000..a32a913
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.*;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+
+public abstract class TajoMetricsReporter {
+ public abstract void report(SortedMap<String, Gauge> gauges,
+ SortedMap<String, Counter> counters,
+ SortedMap<String, Histogram> histograms,
+ SortedMap<String, Meter> meters,
+ SortedMap<String, Timer> timers);
+
+ public <T> Map<String, Map<String, T>> findMetricsItemGroup(SortedMap<String, T> metricsMap) {
+ Map<String, Map<String, T>> metricsGroup = new HashMap<String, Map<String, T>>();
+
+ String previousGroup = null;
+ Map<String, T> groupItems = new HashMap<String, T>();
+
+ for (Map.Entry<String, T> entry : metricsMap.entrySet()) {
+ String key = entry.getKey();
+ String[] keyTokens = key.split("\\.");
+
+ String groupName = null;
+ String itemName = null;
+
+ if (keyTokens.length > 2) {
+ groupName = keyTokens[0] + "." + keyTokens[1];
+ itemName = "";
+ String prefix = "";
+ for (int i = 2; i < keyTokens.length; i++) {
+ itemName += prefix + keyTokens[i];
+ prefix = ".";
+ }
+ } else {
+ groupName = "";
+ itemName = key;
+ if(!metricsGroup.containsKey(groupName)) {
+ metricsGroup.put(groupName, new HashMap<String, T>());
+ }
+ metricsGroup.get(groupName).put(itemName, entry.getValue());
+ continue;
+ }
+
+ if (previousGroup != null && !previousGroup.equals(groupName)) {
+ metricsGroup.put(previousGroup, groupItems);
+ groupItems = new HashMap<String, T>();
+ }
+ groupItems.put(itemName, entry.getValue());
+ previousGroup = groupName;
+ }
+
+ if(groupItems != null && !groupItems.isEmpty()) {
+ metricsGroup.put(previousGroup, groupItems);
+ }
+
+ return metricsGroup;
+ }
+
+ protected String meterGroupToString(String dateTime, String hostAndPort, double rateFactor,
+ String groupName, Map<String, Meter> meters) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(dateTime).append(" ");
+ if(hostAndPort != null && !hostAndPort.isEmpty()) {
+ sb.append(hostAndPort).append(" ");
+ }
+ sb.append("meter").append(" ");
+
+ if(!groupName.isEmpty()) {
+ sb.append(groupName).append(" ");
+ }
+ String prefix = "";
+ for(Map.Entry<String, Meter> eachMeter: meters.entrySet()) {
+ String key = eachMeter.getKey();
+ Meter meter = eachMeter.getValue();
+ sb.append(prefix);
+ sb.append(key).append(".count=").append(meter.getCount()).append("|");
+ sb.append(key).append(".mean=").append(String.format("%2.2f",
+ convertRate(meter.getMeanRate(), rateFactor))).append("|");
+ sb.append(key).append(".1minute=").append(String.format("%2.2f",
+ convertRate(meter.getOneMinuteRate(), rateFactor))).append("|");
+ sb.append(key).append(".5minute=").append(String.format("%2.2f",
+ convertRate(meter.getFiveMinuteRate(), rateFactor))).append("|");
+ sb.append(key).append(".15minute=").append(String.format("%2.2f",
+ convertRate(meter.getFifteenMinuteRate(), rateFactor)));
+ prefix = ",";
+ }
+
+ return sb.toString();
+ }
+
+ protected String counterGroupToString(String dateTime, String hostAndPort, double rateFactor,
+ String groupName, Map<String, Counter> counters) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(dateTime).append(" ");
+ if(hostAndPort != null && !hostAndPort.isEmpty()) {
+ sb.append(hostAndPort).append(" ");
+ }
+ sb.append("counter").append(" ");
+
+ if(!groupName.isEmpty()) {
+ sb.append(groupName).append(" ");
+ }
+ String prefix = "";
+ for(Map.Entry<String, Counter> eachCounter: counters.entrySet()) {
+ sb.append(prefix);
+ sb.append(eachCounter.getKey()).append("=").append(eachCounter.getValue().getCount());
+ prefix = ",";
+
+ }
+ return sb.toString();
+ }
+
+ protected String gaugeGroupToString(String dateTime, String hostAndPort, double rateFactor,
+ String groupName, Map<String, Gauge> gauges) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(dateTime).append(" ");
+ if(hostAndPort != null && !hostAndPort.isEmpty()) {
+ sb.append(hostAndPort).append(" ");
+ }
+ sb.append("guage").append(" ");
+
+ if(!groupName.isEmpty()) {
+ sb.append(groupName).append(" ");
+ }
+ String prefix = "";
+ for(Map.Entry<String, Gauge> eachGauge: gauges.entrySet()) {
+ sb.append(prefix).append(eachGauge.getKey()).append("=").append(eachGauge.getValue().getValue());
+ prefix = ",";
+ }
+ return sb.toString();
+ }
+
+ protected String histogramGroupToString(String dateTime, String hostAndPort, double rateFactor,
+ String groupName, Map<String, Histogram> histograms) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(dateTime).append(" ");
+ if(hostAndPort != null && !hostAndPort.isEmpty()) {
+ sb.append(hostAndPort).append(" ");
+ }
+ sb.append("histo").append(" ");
+
+ String prefix = "";
+ for(Map.Entry<String, Histogram> eachHistogram: histograms.entrySet()) {
+ String key = eachHistogram.getKey();
+ Histogram histogram = eachHistogram.getValue();
+ sb.append(prefix);
+ sb.append(key).append(".count=").append(histogram.getCount()).append("|");
+
+ Snapshot snapshot = histogram.getSnapshot();
+
+ sb.append(key).append(".min=").append(snapshot.getMin()).append("|");
+ sb.append(key).append(".max=").append(snapshot.getMax()).append("|");
+ sb.append(key).append(".mean=").append(String.format("%2.2f", snapshot.getMean())).append("|");
+ sb.append(key).append(".stddev=").append(String.format("%2.2f", snapshot.getStdDev())).append("|");
+ sb.append(key).append(".median=").append(String.format("%2.2f", snapshot.getMedian())).append("|");
+ sb.append(key).append(".75%=").append(String.format("%2.2f", snapshot.get75thPercentile())).append("|");
+ sb.append(key).append(".95%=").append(String.format("%2.2f", snapshot.get95thPercentile())).append("|");
+ sb.append(key).append(".98%=").append(String.format("%2.2f", snapshot.get98thPercentile())).append("|");
+ sb.append(key).append(".99%=").append(String.format("%2.2f", snapshot.get99thPercentile())).append("|");
+ sb.append(key).append(".999%=").append(String.format("%2.2f", snapshot.get999thPercentile()));
+ prefix = ",";
+ }
+ return sb.toString();
+ }
+
+ protected String timerGroupToString(String dateTime, String hostAndPort, double rateFactor,
+ String groupName, Map<String, Timer> timers) {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append(dateTime).append(" ");
+ if(hostAndPort != null && !hostAndPort.isEmpty()) {
+ sb.append(hostAndPort).append(" ");
+ }
+ sb.append("timer").append(" ");
+
+ if(!groupName.isEmpty()) {
+ sb.append(groupName).append(" ");
+ }
+ String prefix = "";
+ for(Map.Entry<String, Timer> eachTimer: timers.entrySet()) {
+ String key = eachTimer.getKey();
+ Timer timer = eachTimer.getValue();
+ Snapshot snapshot = timer.getSnapshot();
+
+ sb.append(prefix);
+ sb.append(key).append(".count=").append(timer.getCount()).append("|");
+ sb.append(key).append(".meanrate=").append(String.format("%2.2f", convertRate(timer.getMeanRate(), rateFactor))).append("|");
+ sb.append(key).append(".1minuterate=").append(String.format("%2.2f", convertRate(timer.getOneMinuteRate(), rateFactor))).append("|");
+ sb.append(key).append(".5minuterate=").append(String.format("%2.2f", convertRate(timer.getFiveMinuteRate(), rateFactor))).append("|");
+ sb.append(key).append(".15minuterate=").append(String.format("%2.2f", convertRate(timer.getFifteenMinuteRate(),rateFactor))).append("|");
+ sb.append(key).append(".min=").append(String.format("%2.2f", convertRate(snapshot.getMin(), rateFactor))).append("|");
+ sb.append(key).append(".max=").append(String.format("%2.2f", convertRate(snapshot.getMax(),rateFactor))).append("|");
+ sb.append(key).append(".mean=").append(String.format("%2.2f", convertRate(snapshot.getMean(), rateFactor))).append("|");
+ sb.append(key).append(".stddev=").append(String.format("%2.2f", convertRate(snapshot.getStdDev(),rateFactor))).append("|");
+ sb.append(key).append(".median=").append(String.format("%2.2f", convertRate(snapshot.getMedian(), rateFactor))).append("|");
+ sb.append(key).append(".75%=").append(String.format("%2.2f", convertRate(snapshot.get75thPercentile(), rateFactor))).append("|");
+ sb.append(key).append(".95%=").append(String.format("%2.2f", convertRate(snapshot.get95thPercentile(), rateFactor))).append("|");
+ sb.append(key).append(".98%=").append(String.format("%2.2f", convertRate(snapshot.get98thPercentile(), rateFactor))).append("|");
+ sb.append(key).append(".99%=").append(String.format("%2.2f", convertRate(snapshot.get99thPercentile(), rateFactor))).append("|");
+ sb.append(key).append(".999%=").append(String.format("%2.2f", convertRate(snapshot.get999thPercentile(),rateFactor)));
+ prefix = ",";
+ }
+
+ return sb.toString();
+ }
+
+ protected double convertRate(double rate, double rateFactor) {
+ return rate * rateFactor;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
new file mode 100644
index 0000000..f11d520
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.metrics.GroupNameMetricsFilter;
+import org.apache.tajo.util.metrics.MetricsFilterList;
+import org.apache.tajo.util.metrics.RegexpMetricsFilter;
+
+import java.io.Closeable;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class TajoMetricsScheduledReporter extends TajoMetricsReporter implements Closeable {
+ private static final Log LOG = LogFactory.getLog(TajoMetricsScheduledReporter.class);
+
+ public static final String PERIOD_KEY = "period";
+
+ protected MetricRegistry registry;
+ protected ScheduledExecutorService executor;
+ protected MetricFilter filter;
+ protected double durationFactor;
+ protected String durationUnit;
+ protected double rateFactor;
+ protected String rateUnit;
+ protected Map<String, String> metricsProperties;
+ protected String metricsName;
+ protected String metricsPropertyKey;
+ protected String hostAndPort;
+ protected long period;
+
+ protected abstract String getReporterName();
+ protected abstract void afterInit();
+
+ private static class NamedThreadFactory implements ThreadFactory {
+ private final ThreadGroup group;
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+ private final String namePrefix;
+
+ private NamedThreadFactory(String name) {
+ final SecurityManager s = System.getSecurityManager();
+ this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+ this.namePrefix = "metrics-" + name + "-thread-";
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ final Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
+ t.setDaemon(true);
+ if (t.getPriority() != Thread.NORM_PRIORITY) {
+ t.setPriority(Thread.NORM_PRIORITY);
+ }
+ return t;
+ }
+ }
+
+ public long getPeriod() {
+ return period;
+ }
+
+ public void init(MetricRegistry registry,
+ String metricsName,
+ String hostAndPort,
+ Map<String, String> metricsProperties) {
+ this.registry = registry;
+ this.metricsName = metricsName;
+ this.executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(metricsName));
+ this.rateFactor = TimeUnit.SECONDS.toSeconds(1);
+ this.rateUnit = calculateRateUnit(TimeUnit.MILLISECONDS);
+ this.durationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1);
+ this.durationUnit = TimeUnit.MILLISECONDS.toString().toLowerCase(Locale.US);
+ this.metricsProperties = metricsProperties;
+ this.metricsPropertyKey = metricsName + "." + getReporterName() + ".";
+ this.hostAndPort = hostAndPort;
+
+ MetricsFilterList filterList = new MetricsFilterList();
+ filterList.addMetricFilter(new GroupNameMetricsFilter(metricsName));
+
+ String regexpFilterKey = metricsPropertyKey + "regexp.";
+ Set<String> regexpExpressions = new HashSet<String>();
+
+ for(Map.Entry<String, String> entry: metricsProperties.entrySet()) {
+ String key = entry.getKey();
+ if(key.indexOf(regexpFilterKey) == 0) {
+ regexpExpressions.add(entry.getValue());
+ }
+ }
+
+ if(!regexpExpressions.isEmpty()) {
+ filterList.addMetricFilter(new RegexpMetricsFilter(regexpExpressions));
+ }
+ this.filter = filterList;
+
+ this.period = 60;
+ if(metricsProperties.get(metricsPropertyKey + PERIOD_KEY) != null) {
+ this.period = Integer.parseInt(metricsProperties.get(metricsPropertyKey + PERIOD_KEY));
+ }
+ afterInit();
+ }
+
+ public void start() {
+ start(period, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Starts the reporter polling at the given period.
+ *
+ * @param period the amount of time between polls
+ * @param unit the unit for {@code period}
+ */
+ public void start(long period, TimeUnit unit) {
+ this.period = period;
+ executor.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ report();
+ } catch (Exception e) {
+ if(LOG.isDebugEnabled()) {
+ LOG.warn("Metric report error:" + e.getMessage(), e);
+ } else {
+ LOG.warn("Metric report error:" + e.getMessage(), e);
+ }
+ }
+ }
+ }, period, period, unit);
+ }
+
+ /**
+ * Stops the reporter and shuts down its thread of execution.
+ */
+ public void stop() {
+ executor.shutdown();
+ try {
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException ignored) {
+ // do nothing
+ }
+ }
+
+ /**
+ * Stops the reporter and shuts down its thread of execution.
+ */
+ @Override
+ public void close() {
+ stop();
+ }
+
+ /**
+ * Report the current values of all metrics in the registry.
+ */
+ public void report() {
+ report(registry.getGauges(filter),
+ registry.getCounters(filter),
+ registry.getHistograms(filter),
+ registry.getMeters(filter),
+ registry.getTimers(filter));
+ }
+
+ protected String getRateUnit() {
+ return rateUnit;
+ }
+
+ protected String getDurationUnit() {
+ return durationUnit;
+ }
+
+ protected double convertDuration(double duration) {
+ return duration * durationFactor;
+ }
+
+ protected double convertRate(double rate) {
+ return rate * rateFactor;
+ }
+
+ private String calculateRateUnit(TimeUnit unit) {
+ final String s = unit.toString().toLowerCase(Locale.US);
+ return s.substring(0, s.length() - 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/webapp/HttpServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/HttpServer.java b/tajo-core/src/main/java/org/apache/tajo/webapp/HttpServer.java
new file mode 100644
index 0000000..60faef2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/webapp/HttpServer.java
@@ -0,0 +1,447 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.webapp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.mortbay.jetty.Connector;
+import org.mortbay.jetty.Handler;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.SessionIdManager;
+import org.mortbay.jetty.handler.ContextHandlerCollection;
+import org.mortbay.jetty.nio.SelectChannelConnector;
+import org.mortbay.jetty.servlet.*;
+import org.mortbay.jetty.webapp.WebAppContext;
+import org.mortbay.thread.QueuedThreadPool;
+import org.mortbay.util.MultiException;
+
+import javax.servlet.http.HttpServlet;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.BindException;
+import java.net.URL;
+import java.util.*;
+
+/**
+ * This class is borrowed from Hadoop and is simplified to our objective.
+ */
+public class HttpServer {
+ private static final Log LOG = LogFactory.getLog(HttpServer.class);
+
+ protected final Server webServer;
+ protected final Connector listener;
+ protected final WebAppContext webAppContext;
+ protected final boolean findPort;
+ protected final Map<Context, Boolean> defaultContexts =
+ new HashMap<Context, Boolean>();
+ protected final List<String> filterNames = new ArrayList<String>();
+ private static final int MAX_RETRIES = 10;
+ private final boolean listenerStartedExternally;
+ static final String STATE_DESCRIPTION_ALIVE = " - alive";
+ static final String STATE_DESCRIPTION_NOT_LIVE = " - not live";
+
+ public HttpServer(String name, String bindAddress, int port,
+ boolean findPort, Connector connector, Configuration conf,
+ String[] pathSpecs) throws IOException {
+ this.webServer = new Server();
+ this.findPort = findPort;
+
+ if (connector == null) {
+ listenerStartedExternally = false;
+ listener = createBaseListener(conf);
+ listener.setHost(bindAddress);
+ listener.setPort(port);
+
+ } else {
+ listenerStartedExternally = true;
+ listener = connector;
+ }
+ webServer.addConnector(listener);
+
+ SessionIdManager sessionIdManager = new HashSessionIdManager(new Random(System.currentTimeMillis()));
+ webServer.setSessionIdManager(sessionIdManager);
+
+ int maxThreads = conf.getInt("tajo.http.maxthreads", -1);
+ // If HTTP_MAX_THREADS is not configured, QueueThreadPool() will use the
+ // default value (currently 250).
+ QueuedThreadPool threadPool = maxThreads == -1 ? new QueuedThreadPool()
+ : new QueuedThreadPool(maxThreads);
+ webServer.setThreadPool(threadPool);
+
+ final String appDir = getWebAppsPath(name);
+ ContextHandlerCollection contexts = new ContextHandlerCollection();
+
+ webAppContext = new WebAppContext();
+ webAppContext.setDisplayName(name);
+ webAppContext.setContextPath("/");
+ webAppContext.setResourceBase(appDir + "/" + name);
+ webAppContext.setDescriptor(appDir + "/" + name + "/WEB-INF/web.xml");
+
+ contexts.addHandler(webAppContext);
+ webServer.setHandler(contexts);
+
+ addDefaultApps(contexts, appDir, conf);
+ }
+
+ /**
+ * Create a required listener for the Jetty instance listening on the port
+ * provided. This wrapper and all subclasses must create at least one
+ * listener.
+ */
+ public Connector createBaseListener(Configuration conf) throws IOException {
+ return HttpServer.createDefaultChannelConnector();
+ }
+
+ static Connector createDefaultChannelConnector() {
+ SelectChannelConnector ret = new SelectChannelConnector();
+ ret.setLowResourceMaxIdleTime(10000);
+ ret.setAcceptQueueSize(128);
+ ret.setResolveNames(false);
+ ret.setUseDirectBuffers(false);
+ return ret;
+ }
+
+ /**
+ * Add default apps.
+ *
+ * @param appDir
+ * The application directory
+ * @throws IOException
+ */
+ protected void addDefaultApps(ContextHandlerCollection parent,
+ final String appDir, Configuration conf) throws IOException {
+ // set up the context for "/logs/" if "hadoop.log.dir" property is defined.
+ String logDir = System.getProperty("tajo.log.dir");
+ if (logDir != null) {
+ Context logContext = new Context(parent, "/logs");
+ logContext.setResourceBase(logDir);
+ //logContext.addServlet(AdminAuthorizedServlet.class, "/*");
+ logContext.setDisplayName("logs");
+ defaultContexts.put(logContext, true);
+ }
+ // set up the context for "/static/*"
+ Context staticContext = new Context(parent, "/static");
+ staticContext.setResourceBase(appDir + "/static");
+ staticContext.addServlet(DefaultServlet.class, "/*");
+ staticContext.setDisplayName("static");
+ defaultContexts.put(staticContext, true);
+ }
+
+ public void addContext(Context ctxt, boolean isFiltered)
+ throws IOException {
+ webServer.addHandler(ctxt);
+ defaultContexts.put(ctxt, isFiltered);
+ }
+
+ /**
+ * Add a context
+ * @param pathSpec The path spec for the context
+ * @param dir The directory containing the context
+ * @param isFiltered if true, the servlet is added to the filter path mapping
+ * @throws IOException
+ */
+ protected void addContext(String pathSpec, String dir, boolean isFiltered) throws IOException {
+ if (0 == webServer.getHandlers().length) {
+ throw new RuntimeException("Couldn't find handler");
+ }
+ WebAppContext webAppCtx = new WebAppContext();
+ webAppCtx.setContextPath(pathSpec);
+ webAppCtx.setWar(dir);
+ addContext(webAppCtx, true);
+ }
+
+ /**
+ * Set a value in the webapp context. These values are available to the jsp
+ * pages as "application.getAttribute(name)".
+ * @param name The name of the attribute
+ * @param value The value of the attribute
+ */
+ public void setAttribute(String name, Object value) {
+ webAppContext.setAttribute(name, value);
+ }
+
+ /**
+ * Add a servlet in the server.
+ * @param name The name of the servlet (can be passed as null)
+ * @param pathSpec The path spec for the servlet
+ * @param clazz The servlet class
+ */
+ public void addServlet(String name, String pathSpec,
+ Class<? extends HttpServlet> clazz) {
+ addInternalServlet(name, pathSpec, clazz, false);
+ addFilterPathMapping(pathSpec, webAppContext);
+ }
+
+ /**
+ * Add an internal servlet in the server, specifying whether or not to
+ * protect with Kerberos authentication.
+ * Note: This method is to be used for adding servlets that facilitate
+ * internal communication and not for user facing functionality. For
+ * servlets added using this method, filters (except internal Kerberized
+ * filters) are not enabled.
+ *
+ * @param name The name of the servlet (can be passed as null)
+ * @param pathSpec The path spec for the servlet
+ * @param clazz The servlet class
+ */
+ public void addInternalServlet(String name, String pathSpec,
+ Class<? extends HttpServlet> clazz, boolean requireAuth) {
+ ServletHolder holder = new ServletHolder(clazz);
+ if (name != null) {
+ holder.setName(name);
+ }
+ webAppContext.addServlet(holder, pathSpec);
+
+ if(requireAuth && UserGroupInformation.isSecurityEnabled()) {
+ LOG.info("Adding Kerberos filter to " + name);
+ ServletHandler handler = webAppContext.getServletHandler();
+ FilterMapping fmap = new FilterMapping();
+ fmap.setPathSpec(pathSpec);
+ fmap.setFilterName("krb5Filter");
+ fmap.setDispatches(Handler.ALL);
+ handler.addFilterMapping(fmap);
+ }
+ }
+
+ /**
+ * Add the path spec to the filter path mapping.
+ * @param pathSpec The path spec
+ * @param webAppCtx The WebApplicationContext to add to
+ */
+ protected void addFilterPathMapping(String pathSpec,
+ Context webAppCtx) {
+ ServletHandler handler = webAppCtx.getServletHandler();
+ for(String name : filterNames) {
+ FilterMapping fmap = new FilterMapping();
+ fmap.setPathSpec(pathSpec);
+ fmap.setFilterName(name);
+ fmap.setDispatches(Handler.ALL);
+ handler.addFilterMapping(fmap);
+ }
+ }
+
+ protected String getWebAppsPath(String name) throws FileNotFoundException {
+ URL url = getClass().getClassLoader().getResource("webapps/" + name);
+ if (url == null) {
+ throw new FileNotFoundException("webapps/" + name + " not found in CLASSPATH");
+ }
+ String urlString = url.toString();
+ return urlString.substring(0, urlString.lastIndexOf('/'));
+ }
+
+ /**
+ * Get the value in the webapp context.
+ * @param name The name of the attribute
+ * @return The value of the attribute
+ */
+ public Object getAttribute(String name) {
+ return webAppContext.getAttribute(name);
+ }
+
+ /**
+ * Get the port that the server is on
+ * @return the port
+ */
+ public int getPort() {
+ return webServer.getConnectors()[0].getLocalPort();
+ }
+
+ /**
+ * Set the min, max number of worker threads (simultaneous connections).
+ */
+ public void setThreads(int min, int max) {
+ QueuedThreadPool pool = (QueuedThreadPool) webServer.getThreadPool() ;
+ pool.setMinThreads(min);
+ pool.setMaxThreads(max);
+ }
+
+ /**
+ * Start the server. Does not wait for the server to start.
+ */
+ public void start() throws IOException {
+ try {
+ if (listenerStartedExternally) { // Expect that listener was started
+ // securely
+ if (listener.getLocalPort() == -1) // ... and verify
+ throw new Exception("Exepected webserver's listener to be started "
+ + "previously but wasn't");
+ // And skip all the port rolling issues.
+ webServer.start();
+ } else {
+ int port;
+ int oriPort = listener.getPort(); // The original requested port
+ while (true) {
+ try {
+ port = webServer.getConnectors()[0].getLocalPort();
+ LOG.debug("Port returned by webServer.getConnectors()[0]."
+ + "getLocalPort() before open() is " + port
+ + ". Opening the listener on " + oriPort);
+ listener.open();
+ port = listener.getLocalPort();
+ LOG.debug("listener.getLocalPort() returned "
+ + listener.getLocalPort()
+ + " webServer.getConnectors()[0].getLocalPort() returned "
+ + webServer.getConnectors()[0].getLocalPort());
+ // Workaround to handle the problem reported in HADOOP-4744
+ if (port < 0) {
+ Thread.sleep(100);
+ int numRetries = 1;
+ while (port < 0) {
+ LOG.warn("listener.getLocalPort returned " + port);
+ if (numRetries++ > MAX_RETRIES) {
+ throw new Exception(" listener.getLocalPort is returning "
+ + "less than 0 even after " + numRetries + " resets");
+ }
+ for (int i = 0; i < 2; i++) {
+ LOG.info("Retrying listener.getLocalPort()");
+ port = listener.getLocalPort();
+ if (port > 0) {
+ break;
+ }
+ Thread.sleep(200);
+ }
+ if (port > 0) {
+ break;
+ }
+ LOG.info("Bouncing the listener");
+ listener.close();
+ Thread.sleep(1000);
+ listener.setPort(oriPort == 0 ? 0 : (oriPort += 1));
+ listener.open();
+ Thread.sleep(100);
+ port = listener.getLocalPort();
+ }
+ } // Workaround end
+ LOG.info("Jetty bound to port " + port);
+ webServer.start();
+ break;
+ } catch (IOException ex) {
+ // if this is a bind exception,
+ // then try the next port number.
+ if (ex instanceof BindException) {
+ if (!findPort) {
+ BindException be = new BindException("Port in use: "
+ + listener.getHost() + ":" + listener.getPort());
+ be.initCause(ex);
+ throw be;
+ }
+ } else {
+ LOG.info("HttpServer.start() threw a non Bind IOException");
+ throw ex;
+ }
+ } catch (MultiException ex) {
+ LOG.info("HttpServer.start() threw a MultiException");
+ throw ex;
+ }
+ listener.setPort((oriPort += 1));
+ }
+ }
+ // Make sure there is no handler failures.
+ Handler[] handlers = webServer.getHandlers();
+ for (int i = 0; i < handlers.length; i++) {
+ if (handlers[i].isFailed()) {
+ throw new IOException(
+ "Problem in starting http server. Server handlers failed");
+ }
+ }
+ } catch (IOException e) {
+ throw e;
+ } catch (InterruptedException e) {
+ throw (IOException) new InterruptedIOException(
+ "Interrupted while starting HTTP server").initCause(e);
+ } catch (Exception e) {
+ throw new IOException("Problem starting http server", e);
+ }
+ }
+
+ /**
+ * stop the server
+ */
+ public void stop() throws Exception {
+ MultiException exception = null;
+ try {
+ listener.close();
+ } catch (Exception e) {
+ LOG.error(
+ "Error while stopping listener for webapp"
+ + webAppContext.getDisplayName(), e);
+ exception = addMultiException(exception, e);
+ }
+
+ try {
+ // clear & stop webAppContext attributes to avoid memory leaks.
+ webAppContext.clearAttributes();
+ webAppContext.stop();
+ } catch (Exception e) {
+ LOG.error("Error while stopping web app context for webapp "
+ + webAppContext.getDisplayName(), e);
+ exception = addMultiException(exception, e);
+ }
+ try {
+ webServer.stop();
+ } catch (Exception e) {
+ LOG.error(
+ "Error while stopping web server for webapp "
+ + webAppContext.getDisplayName(), e);
+ exception = addMultiException(exception, e);
+ }
+
+ if (exception != null) {
+ exception.ifExceptionThrow();
+ }
+
+ }
+
+ private MultiException addMultiException(MultiException exception, Exception e) {
+ if (exception == null) {
+ exception = new MultiException();
+ }
+ exception.add(e);
+ return exception;
+ }
+
+ public void join() throws InterruptedException {
+ webServer.join();
+ }
+
+ /**
+ * Test for the availability of the web server
+ *
+ * @return true if the web server is started, false otherwise
+ */
+ public boolean isAlive() {
+ return webServer != null && webServer.isStarted();
+ }
+
+ /**
+ * Return the host and port of the HttpServer, if live
+ *
+ * @return the classname and any HTTP URL
+ */
+ @Override
+ public String toString() {
+ return listener != null ? ("HttpServer at http://" + listener.getHost()
+ + ":" + listener.getLocalPort() + "/" + (isAlive() ? STATE_DESCRIPTION_ALIVE
+ : STATE_DESCRIPTION_NOT_LIVE))
+ : "Inactive HttpServer";
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
new file mode 100644
index 0000000..faeadaf
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
@@ -0,0 +1,376 @@
+package org.apache.tajo.webapp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.client.QueryStatus;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.jdbc.TajoResultSet;
+import org.apache.tajo.util.JSPUtil;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class QueryExecutorServlet extends HttpServlet {
+ private static final Log LOG = LogFactory.getLog(QueryExecutorServlet.class);
+
+ ObjectMapper om = new ObjectMapper();
+
+ //queryRunnerId -> QueryRunner
+ private final Map<String, QueryRunner> queryRunners = new HashMap<String, QueryRunner>();
+
+ private TajoClient tajoClient;
+
+ private ExecutorService queryRunnerExecutor = Executors.newFixedThreadPool(5);
+
+ private QueryRunnerCleaner queryRunnerCleaner;
+ @Override
+ public void init(ServletConfig config) throws ServletException {
+ om.getDeserializationConfig().disable(
+ DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES);
+
+ try {
+ tajoClient = new TajoClient(new TajoConf());
+
+ queryRunnerCleaner = new QueryRunnerCleaner();
+ queryRunnerCleaner.start();
+ } catch (IOException e) {
+ LOG.error(e.getMessage());
+ }
+ }
+
+ @Override
+ public void service(HttpServletRequest request,
+ HttpServletResponse response) throws ServletException, IOException {
+ String action = request.getParameter("action");
+ Map<String, Object> returnValue = new HashMap<String, Object>();
+ try {
+ if(tajoClient == null) {
+ errorResponse(response, "TajoClient not initialized");
+ return;
+ }
+ if(action == null || action.trim().isEmpty()) {
+ errorResponse(response, "no action parameter.");
+ return;
+ }
+
+ if("runQuery".equals(action)) {
+ String query = request.getParameter("query");
+ if(query == null || query.trim().isEmpty()) {
+ errorResponse(response, "No query parameter");
+ return;
+ }
+ String queryRunnerId = null;
+ while(true) {
+ synchronized(queryRunners) {
+ queryRunnerId = "" + System.currentTimeMillis();
+ if(!queryRunners.containsKey(queryRunnerId)) {
+ break;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ QueryRunner queryRunner = new QueryRunner(queryRunnerId, query);
+ try {
+ queryRunner.sizeLimit = Integer.parseInt(request.getParameter("limitSize"));
+ } catch (java.lang.NumberFormatException nfe) {
+ queryRunner.sizeLimit = 1048576;
+ }
+ synchronized(queryRunners) {
+ queryRunners.put(queryRunnerId, queryRunner);
+ }
+ queryRunnerExecutor.submit(queryRunner);
+ returnValue.put("queryRunnerId", queryRunnerId);
+ } else if("getQueryProgress".equals(action)) {
+ synchronized(queryRunners) {
+ String queryRunnerId = request.getParameter("queryRunnerId");
+ QueryRunner queryRunner = queryRunners.get(queryRunnerId);
+ if(queryRunner == null) {
+ errorResponse(response, "No query info:" + queryRunnerId);
+ return;
+ }
+ if(queryRunner.error != null) {
+ errorResponse(response, queryRunner.error);
+ return;
+ }
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ returnValue.put("progress", queryRunner.progress);
+ returnValue.put("startTime", df.format(queryRunner.startTime));
+ returnValue.put("finishTime", queryRunner.finishTime == 0 ? "-" : df.format(queryRunner.startTime));
+ returnValue.put("runningTime", JSPUtil.getElapsedTime(queryRunner.startTime, queryRunner.finishTime));
+ }
+ } else if("getQueryResult".equals(action)) {
+ synchronized(queryRunners) {
+ String queryRunnerId = request.getParameter("queryRunnerId");
+ QueryRunner queryRunner = queryRunners.get(queryRunnerId);
+ if(queryRunner == null) {
+ errorResponse(response, "No query info:" + queryRunnerId);
+ return;
+ }
+ if(queryRunner.error != null) {
+ errorResponse(response, queryRunner.error);
+ return;
+ }
+ returnValue.put("numOfRows", queryRunner.numOfRows);
+ returnValue.put("resultSize", queryRunner.resultSize);
+ returnValue.put("resultData", queryRunner.queryResult);
+ returnValue.put("resultColumns", queryRunner.columnNames);
+ returnValue.put("runningTime", JSPUtil.getElapsedTime(queryRunner.startTime, queryRunner.finishTime));
+ }
+ } else if("clearAllQueryRunner".equals(action)) {
+ synchronized(queryRunners) {
+ for(QueryRunner eachQueryRunner: queryRunners.values()) {
+ eachQueryRunner.setStop();
+ }
+ queryRunners.clear();
+ }
+ }
+ returnValue.put("success", "true");
+ writeHttpResponse(response, returnValue);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ errorResponse(response, e);
+ }
+ }
+
+ private void errorResponse(HttpServletResponse response, Exception e) throws IOException {
+ errorResponse(response, e.getMessage() + "\n" + StringUtils.stringifyException(e));
+ }
+
+ private void errorResponse(HttpServletResponse response, String message) throws IOException {
+ Map<String, Object> errorMessage = new HashMap<String, Object>();
+ errorMessage.put("success", "false");
+ errorMessage.put("errorMessage", message);
+ writeHttpResponse(response, errorMessage);
+ }
+
+ private void writeHttpResponse(HttpServletResponse response, Map<String, Object> outputMessage) throws IOException {
+ response.setContentType("text/html");
+
+ OutputStream out = response.getOutputStream();
+ out.write(om.writeValueAsBytes(outputMessage));
+
+ out.flush();
+ out.close();
+ }
+
+ class QueryRunnerCleaner extends Thread {
+ public void run() {
+ List<QueryRunner> queryRunnerList;
+ synchronized(queryRunners) {
+ queryRunnerList = new ArrayList<QueryRunner>(queryRunners.values());
+ for(QueryRunner eachQueryRunner: queryRunnerList) {
+ if(!eachQueryRunner.running.get() &&
+ (System.currentTimeMillis() - eachQueryRunner.finishTime > 180 * 1000)) {
+ queryRunners.remove(eachQueryRunner.queryRunnerId);
+ }
+ }
+ }
+ }
+ }
+
+ class QueryRunner extends Thread {
+ long startTime;
+ long finishTime;
+
+ String queryRunnerId;
+
+ ClientProtos.SubmitQueryResponse queryRespons;
+ AtomicBoolean running = new AtomicBoolean(true);
+ AtomicBoolean stop = new AtomicBoolean(false);
+ QueryId queryId;
+ String query;
+ long resultSize;
+ int sizeLimit;
+ long numOfRows;
+ Exception error;
+
+ AtomicInteger progress = new AtomicInteger(0);
+
+ List<String> columnNames = new ArrayList<String>();
+
+ List<List<Object>> queryResult;
+
+ public QueryRunner(String queryRunnerId, String query) {
+ this.queryRunnerId = queryRunnerId;
+ this.query = query;
+ }
+
+ public void setStop() {
+ this.stop.set(true);
+ this.interrupt();
+ }
+
+ public void run() {
+ startTime = System.currentTimeMillis();
+ try {
+ queryRespons = tajoClient.executeQuery(query);
+ if (queryRespons.getResultCode() == ClientProtos.ResultCode.OK) {
+ QueryId queryId = null;
+ try {
+ queryId = new QueryId(queryRespons.getQueryId());
+ getQueryResult(queryId);
+ } finally {
+ if (queryId != null) {
+ tajoClient.closeQuery(queryId);
+ }
+ }
+ } else {
+ LOG.error("queryRespons.getResultCode() not OK:" + queryRespons.getResultCode());
+ error = new Exception("queryRespons.getResultCode() not OK:" + queryRespons.getResultCode());
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ error = e;
+ } finally {
+ running.set(false);
+ finishTime = System.currentTimeMillis();
+ }
+ }
+
+ private void getQueryResult(QueryId tajoQueryId) {
+ // query execute
+ try {
+ QueryStatus status = null;
+
+ while (!stop.get()) {
+ try {
+ Thread.sleep(1000);
+ } catch(InterruptedException e) {
+ break;
+ }
+ status = tajoClient.getQueryStatus(tajoQueryId);
+ if (status.getState() == TajoProtos.QueryState.QUERY_MASTER_INIT
+ || status.getState() == TajoProtos.QueryState.QUERY_MASTER_LAUNCHED) {
+ continue;
+ }
+
+ if (status.getState() == TajoProtos.QueryState.QUERY_RUNNING
+ || status.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+ int progressValue = (int) (status.getProgress() * 100.0f);
+ if(progressValue == 100) {
+ progressValue = 99;
+ }
+ progress.set(progressValue);
+ }
+ if (status.getState() != TajoProtos.QueryState.QUERY_RUNNING
+ && status.getState() != TajoProtos.QueryState.QUERY_NOT_ASSIGNED) {
+ break;
+ }
+ }
+
+ if(status == null) {
+ LOG.error("Query Status is null");
+ error = new Exception("Query Status is null");
+ return;
+ }
+ if (status.getState() == TajoProtos.QueryState.QUERY_ERROR ||
+ status.getState() == TajoProtos.QueryState.QUERY_FAILED) {
+ error = new Exception(status.getErrorMessage());
+ } else if (status.getState() == TajoProtos.QueryState.QUERY_KILLED) {
+ LOG.info(queryId + " is killed.");
+ error = new Exception(queryId + " is killed.");
+ } else {
+ if (status.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+ if (status.hasResult()) {
+ ResultSet res = null;
+ try {
+ ClientProtos.GetQueryResultResponse response = tajoClient.getResultResponse(tajoQueryId);
+ TableDesc desc = CatalogUtil.newTableDesc(response.getTableDesc());
+ tajoClient.getConf().setVar(TajoConf.ConfVars.USERNAME, response.getTajoUserName());
+ res = new TajoResultSet(tajoClient, queryId, tajoClient.getConf(), desc);
+
+ ResultSetMetaData rsmd = res.getMetaData();
+ resultSize = desc.getStats().getNumBytes();
+ LOG.info("Tajo Query Result: " + desc.getPath() + "\n");
+
+ int numOfColumns = rsmd.getColumnCount();
+ for(int i = 0; i < numOfColumns; i++) {
+ columnNames.add(rsmd.getColumnName(i + 1));
+ }
+ queryResult = new ArrayList<List<Object>>();
+
+ if(sizeLimit < resultSize) {
+ numOfRows = (long)((float)(desc.getStats().getNumRows()) * ((float)sizeLimit / (float)resultSize));
+ } else {
+ numOfRows = desc.getStats().getNumRows();
+ }
+ int rowCount = 0;
+ boolean hasMoreData = false;
+ while (res.next()) {
+ if(rowCount > numOfRows) {
+ hasMoreData = true;
+ break;
+ }
+ List<Object> row = new ArrayList<Object>();
+ for(int i = 0; i < numOfColumns; i++) {
+ row.add(res.getObject(i + 1).toString());
+ }
+ queryResult.add(row);
+ rowCount++;
+
+ }
+ } finally {
+ if (res != null) {
+ res.close();
+ }
+ progress.set(100);
+ }
+ } else {
+ error = new Exception(queryId + " no result");
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ error = e;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java b/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
new file mode 100644
index 0000000..09426e0
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.webapp;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.worker.TajoWorker;
+import org.mortbay.jetty.Connector;
+
+import java.io.IOException;
+import java.net.Inet4Address;
+
+public class StaticHttpServer extends HttpServer {
+ private static StaticHttpServer instance = null;
+
+ private StaticHttpServer(Object containerObject , String name, String bindAddress, int port,
+ boolean findPort, Connector connector, Configuration conf,
+ String[] pathSpecs) throws IOException {
+ super( name, bindAddress, port, findPort, connector, conf, pathSpecs);
+ }
+ public static StaticHttpServer getInstance() {
+ return instance;
+ }
+ public static StaticHttpServer getInstance(Object containerObject, String name,
+ String bindAddress, int port, boolean findPort, Connector connector,
+ TajoConf conf,
+ String[] pathSpecs) throws IOException {
+ String addr = bindAddress;
+ if(instance == null) {
+ if(bindAddress == null || bindAddress.compareTo("") == 0) {
+ if (containerObject instanceof TajoMaster) {
+ addr = conf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS).split(":")[0];
+ } else if (containerObject instanceof TajoWorker) {
+ addr = Inet4Address.getLocalHost().getHostName();
+ }
+ }
+
+ instance = new StaticHttpServer(containerObject, name, addr, port,
+ findPort, connector, conf, pathSpecs);
+ instance.setAttribute("tajo.info.server.object", containerObject);
+ instance.setAttribute("tajo.info.server.addr", addr);
+ instance.setAttribute("tajo.info.server.conf", conf);
+ instance.setAttribute("tajo.info.server.starttime", System.currentTimeMillis());
+ }
+ return instance;
+ }
+
+ public void set(String name, Object object) {
+ instance.setAttribute(name, object);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
new file mode 100644
index 0000000..55aa8c4
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tajo.master.ContainerProxy;
+
+import java.util.Map;
+
+public abstract class AbstractResourceAllocator extends CompositeService implements ResourceAllocator {
+ private Map<ContainerId, ContainerProxy> containers = Maps.newConcurrentMap();
+
+ public AbstractResourceAllocator() {
+ super(AbstractResourceAllocator.class.getName());
+ }
+
+ public void addContainer(ContainerId cId, ContainerProxy container) {
+ containers.put(cId, container);
+ }
+
+ public void removeContainer(ContainerId cId) {
+ containers.remove(cId);
+ }
+
+ public boolean containsContainer(ContainerId cId) {
+ return containers.containsKey(cId);
+ }
+
+ public ContainerProxy getContainer(ContainerId cId) {
+ return containers.get(cId);
+ }
+
+ public Map<ContainerId, ContainerProxy> getContainers() {
+ return containers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/DeletionService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/DeletionService.java b/tajo-core/src/main/java/org/apache/tajo/worker/DeletionService.java
new file mode 100644
index 0000000..42ea71f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/DeletionService.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+
+import java.io.IOException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+public class DeletionService {
+ static final Log LOG = LogFactory.getLog(DeletionService.class);
+
+ private int debugDelay;
+ private ScheduledThreadPoolExecutor sched;
+ private final FileContext lfs = getLfs();
+
+ static final FileContext getLfs() {
+ try {
+ return FileContext.getLocalFSFileContext();
+ } catch (UnsupportedFileSystemException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public DeletionService(int defaultThreads, int debugDelay) {
+ ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat("DeletionService #%d").build();
+
+ sched = new ScheduledThreadPoolExecutor(defaultThreads, tf);
+ sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ sched.setKeepAliveTime(60L, TimeUnit.SECONDS);
+ this.debugDelay = debugDelay;
+ }
+
+
+ /**
+ * /**
+ * Delete the path(s) as this user.
+ *
+ * @param subDir the sub directory name
+ * @param baseDirs the base directories which contains the subDir's
+ */
+ public void delete(Path subDir, Path... baseDirs) {
+ if (debugDelay != -1) {
+ sched.schedule(new FileDeletion(subDir, baseDirs), debugDelay, TimeUnit.SECONDS);
+ }
+ }
+
+ public void stop() {
+ sched.shutdown();
+ boolean terminated = false;
+ try {
+ terminated = sched.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ }
+ if (!terminated) {
+ sched.shutdownNow();
+ }
+ }
+
+ private class FileDeletion implements Runnable {
+ final Path subDir;
+ final Path[] baseDirs;
+
+ FileDeletion(Path subDir, Path[] baseDirs) {
+ this.subDir = subDir;
+ this.baseDirs = baseDirs;
+ }
+
+ @Override
+ public void run() {
+
+ if (baseDirs == null || baseDirs.length == 0) {
+ LOG.debug("Worker deleting absolute path : " + subDir);
+ try {
+ lfs.delete(subDir, true);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete " + subDir);
+ }
+ return;
+ }
+ for (Path baseDir : baseDirs) {
+ Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
+ LOG.debug("Worker deleting path : " + del);
+ try {
+ lfs.delete(del, true);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete " + subDir);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
new file mode 100644
index 0000000..bb136f7
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.*;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.channels.FileChannel;
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+/**
+ * Fetcher fetches data from a given uri via HTTP protocol and stores them into
+ * a specific file. It aims at asynchronous and efficient data transmit.
+ */
+public class Fetcher {
+ private final static Log LOG = LogFactory.getLog(Fetcher.class);
+
+ private final URI uri;
+ private final File file;
+
+ private final String host;
+ private int port;
+
+ private long startTime;
+ private long finishTime;
+ private long fileLen;
+ private int messageReceiveCount;
+
+ private ClientBootstrap bootstrap;
+
+ public Fetcher(URI uri, File file, ClientSocketChannelFactory factory) {
+ this.uri = uri;
+ this.file = file;
+
+ String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
+ this.host = uri.getHost() == null ? "localhost" : uri.getHost();
+ this.port = uri.getPort();
+ if (port == -1) {
+ if (scheme.equalsIgnoreCase("http")) {
+ this.port = 80;
+ } else if (scheme.equalsIgnoreCase("https")) {
+ this.port = 443;
+ }
+ }
+
+ bootstrap = new ClientBootstrap(factory);
+ bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec
+ bootstrap.setOption("receiveBufferSize", 1048576); // set 1M
+ bootstrap.setOption("tcpNoDelay", true);
+
+ ChannelPipelineFactory pipelineFactory = new HttpClientPipelineFactory(file);
+ bootstrap.setPipelineFactory(pipelineFactory);
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public long getFileLen() {
+ return fileLen;
+ }
+
+ public int getMessageReceiveCount() {
+ return messageReceiveCount;
+ }
+
+ public String getStatus() {
+ if(startTime == 0) {
+ return "READY";
+ }
+
+ if(startTime > 0 && finishTime == 0) {
+ return "FETCHING";
+ } else {
+ return "FINISH";
+ }
+ }
+
+ public File get() throws IOException {
+ startTime = System.currentTimeMillis();
+
+ ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
+
+ // Wait until the connection attempt succeeds or fails.
+ Channel channel = future.awaitUninterruptibly().getChannel();
+ if (!future.isSuccess()) {
+ future.getChannel().close();
+ throw new IOException(future.getCause());
+ }
+
+ String query = uri.getPath()
+ + (uri.getRawQuery() != null ? "?" + uri.getRawQuery() : "");
+ // Prepare the HTTP request.
+ HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, query);
+ request.setHeader(HttpHeaders.Names.HOST, host);
+ LOG.info("Fetch: " + uri);
+ request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+ request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
+
+
+ // Send the HTTP request.
+ ChannelFuture channelFuture = channel.write(request);
+
+ // Wait for the server to close the connection.
+ channel.getCloseFuture().awaitUninterruptibly();
+
+ channelFuture.addListener(ChannelFutureListener.CLOSE);
+
+ // Close the channel to exit.
+ future.getChannel().close();
+ finishTime = System.currentTimeMillis();
+ return file;
+ }
+
+ public URI getURI() {
+ return this.uri;
+ }
+
+ class HttpClientHandler extends SimpleChannelUpstreamHandler {
+ private volatile boolean readingChunks;
+ private final File file;
+ private RandomAccessFile raf;
+ private FileChannel fc;
+ private long length = -1;
+
+ public HttpClientHandler(File file) throws FileNotFoundException {
+ this.file = file;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ messageReceiveCount++;
+ try {
+ if (!readingChunks) {
+ HttpResponse response = (HttpResponse) e.getMessage();
+
+ StringBuilder sb = new StringBuilder();
+ if (LOG.isDebugEnabled()) {
+ sb.append("STATUS: ").append(response.getStatus())
+ .append(", VERSION: ").append(response.getProtocolVersion())
+ .append(", HEADER: ");
+ }
+ if (!response.getHeaderNames().isEmpty()) {
+ for (String name : response.getHeaderNames()) {
+ for (String value : response.getHeaders(name)) {
+ if (LOG.isDebugEnabled()) {
+ sb.append(name).append(" = ").append(value);
+ }
+ if (this.length == -1 && name.equals("Content-Length")) {
+ this.length = Long.valueOf(value);
+ }
+ }
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sb.toString());
+ }
+
+ if (response.getStatus() == HttpResponseStatus.NO_CONTENT) {
+ LOG.info("There are no data corresponding to the request");
+ return;
+ }
+
+ this.raf = new RandomAccessFile(file, "rw");
+ this.fc = raf.getChannel();
+
+ if (response.isChunked()) {
+ readingChunks = true;
+ } else {
+ ChannelBuffer content = response.getContent();
+ if (content.readable()) {
+ fc.write(content.toByteBuffer());
+ }
+ }
+ } else {
+ HttpChunk chunk = (HttpChunk) e.getMessage();
+ if (chunk.isLast()) {
+ readingChunks = false;
+ long fileLength = file.length();
+ if (fileLength == length) {
+ LOG.info("Data fetch is done (total received bytes: " + fileLength
+ + ")");
+ } else {
+ LOG.info("Data fetch is done, but cannot get all data "
+ + "(received/total: " + fileLength + "/" + length + ")");
+ }
+ } else {
+ fc.write(chunk.getContent().toByteBuffer());
+ }
+ }
+ } finally {
+ if(raf != null) {
+ fileLen = file.length();
+ }
+
+ if(fileLen >= length){
+ IOUtils.cleanup(LOG, fc, raf);
+
+ }
+ }
+ }
+ }
+
+ class HttpClientPipelineFactory implements
+ ChannelPipelineFactory {
+ private final File file;
+
+ public HttpClientPipelineFactory(File file) {
+ this.file = file;
+ }
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = pipeline();
+
+ pipeline.addLast("codec", new HttpClientCodec());
+ pipeline.addLast("inflater", new HttpContentDecompressor());
+ pipeline.addLast("handler", new HttpClientHandler(file));
+ return pipeline;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
new file mode 100644
index 0000000..42ad875
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.worker.dataserver.FileAccessForbiddenException;
+import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
+import org.apache.tajo.worker.dataserver.retriever.FileChunk;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+@Deprecated
+public class InterDataRetriever implements DataRetriever {
+ private final Log LOG = LogFactory.getLog(InterDataRetriever.class);
+ private final Set<QueryUnitId> registered = Sets.newHashSet();
+ private final Map<String, String> map = Maps.newConcurrentMap();
+
+ public InterDataRetriever() {
+ }
+
+ public void register(QueryUnitId id, String baseURI) {
+ synchronized (registered) {
+ if (!registered.contains(id)) {
+ map.put(id.toString(), baseURI);
+ registered.add(id);
+ }
+ }
+ }
+
+ public void unregister(QueryUnitId id) {
+ synchronized (registered) {
+ if (registered.contains(id)) {
+ map.remove(id.toString());
+ registered.remove(id);
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tajo.worker.dataserver.retriever.DataRetriever#handle(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.handler.codec.http.HttpRequest)
+ */
+ @Override
+ public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
+ throws IOException {
+
+ int start = request.getUri().indexOf('?');
+ if (start < 0) {
+ throw new IllegalArgumentException("Wrong request: " + request.getUri());
+ }
+
+ String queryStr = request.getUri().substring(start + 1);
+ LOG.info("QUERY: " + queryStr);
+ String [] queries = queryStr.split("&");
+
+ String qid = null;
+ String fn = null;
+ String [] kv;
+ for (String query : queries) {
+ kv = query.split("=");
+ if (kv[0].equals("qid")) {
+ qid = kv[1];
+ } else if (kv[0].equals("fn")) {
+ fn = kv[1];
+ }
+ }
+
+ String baseDir = map.get(qid);
+ if (baseDir == null) {
+ throw new FileNotFoundException("No such qid: " + qid);
+ }
+
+ File file = new File(baseDir + "/" + fn);
+ if (file.isHidden() || !file.exists()) {
+ throw new FileNotFoundException("No such file: " + baseDir + "/"
+ + file.getName());
+ }
+ if (!file.isFile()) {
+ throw new FileAccessForbiddenException("No such file: "
+ + baseDir + "/" + file.getName());
+ }
+
+ return new FileChunk[] {new FileChunk(file, 0, file.length())};
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java
new file mode 100644
index 0000000..36e7353
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.worker.dataserver.retriever.FileChunk;
+import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class PartitionRetrieverHandler implements RetrieverHandler {
+ private final String baseDir;
+
+ public PartitionRetrieverHandler(String baseDir) {
+ this.baseDir = baseDir;
+ }
+
+ @Override
+ public FileChunk get(Map<String, List<String>> kvs) throws IOException {
+ // nothing to verify the file because AdvancedDataRetriever checks
+ // its validity of the file.
+ File file = new File(baseDir + "/" + kvs.get("fn").get(0));
+
+ return new FileChunk(file, 0, file.length());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
new file mode 100644
index 0000000..be33a12
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.worker.dataserver.retriever.FileChunk;
+import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ * It retrieves the file chunk ranged between start and end keys.
+ * The start key is inclusive, but the end key is exclusive.
+ *
+ * Internally, there are four cases:
+ * <ul>
+ * <li>out of scope: the index range does not overlapped with the query range.</li>
+ * <li>overlapped: the index range is partially overlapped with the query range. </li>
+ * <li>included: the index range is included in the start and end keys</li>
+ * <li>covered: the index range covers the query range (i.e., start and end keys).</li>
+ * </ul>
+ */
+public class RangeRetrieverHandler implements RetrieverHandler {
+ private static final Log LOG = LogFactory.getLog(RangeRetrieverHandler.class);
+ private final File file;
+ private final BSTIndex.BSTIndexReader idxReader;
+ private final Schema schema;
+ private final TupleComparator comp;
+ private final RowStoreDecoder decoder;
+
+ public RangeRetrieverHandler(File outDir, Schema schema, TupleComparator comp) throws IOException {
+ this.file = outDir;
+ BSTIndex index = new BSTIndex(new TajoConf());
+ this.schema = schema;
+ this.comp = comp;
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ Path indexPath = fs.makeQualified(new Path(outDir.getCanonicalPath(), "index"));
+ this.idxReader =
+ index.getIndexReader(indexPath, this.schema, this.comp);
+ this.idxReader.open();
+ LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", "
+ + idxReader.getLastKey());
+ this.decoder = RowStoreUtil.createDecoder(schema);
+ }
+
+ @Override
+ public FileChunk get(Map<String, List<String>> kvs) throws IOException {
+ // nothing to verify the file because AdvancedDataRetriever checks
+ // its validity of the file.
+ File data = new File(this.file, "data/data");
+ byte [] startBytes = Base64.decodeBase64(kvs.get("start").get(0));
+ Tuple start = decoder.toTuple(startBytes);
+ byte [] endBytes;
+ Tuple end;
+ endBytes = Base64.decodeBase64(kvs.get("end").get(0));
+ end = decoder.toTuple(endBytes);
+ boolean last = kvs.containsKey("final");
+
+ if(!comp.isAscendingFirstKey()) {
+ Tuple tmpKey = start;
+ start = end;
+ end = tmpKey;
+ }
+
+ LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end +
+ (last ? ", last=true" : "") + ")");
+
+ if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero
+ LOG.info("There is no contents");
+ return null;
+ }
+
+ if (comp.compare(end, idxReader.getFirstKey()) < 0 ||
+ comp.compare(idxReader.getLastKey(), start) < 0) {
+ LOG.info("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
+ "], but request start:" + start + ", end: " + end);
+ return null;
+ }
+
+ long startOffset;
+ long endOffset;
+ try {
+ startOffset = idxReader.find(start);
+ } catch (IOException ioe) {
+ LOG.error("State Dump (the requested range: "
+ + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ throw ioe;
+ }
+ try {
+ endOffset = idxReader.find(end);
+ if (endOffset == -1) {
+ endOffset = idxReader.find(end, true);
+ }
+ } catch (IOException ioe) {
+ LOG.error("State Dump (the requested range: "
+ + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ throw ioe;
+ }
+
+ // if startOffset == -1 then case 2-1 or case 3
+ if (startOffset == -1) { // this is a hack
+ // if case 2-1 or case 3
+ try {
+ startOffset = idxReader.find(start, true);
+ } catch (IOException ioe) {
+ LOG.error("State Dump (the requested range: "
+ + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ throw ioe;
+ }
+ }
+
+ if (startOffset == -1) {
+ throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
+ "State Dump (the requested range: "
+ + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ }
+
+ // if greater than indexed values
+ if (last || (endOffset == -1 && comp.compare(idxReader.getLastKey(), end) < 0)) {
+ endOffset = data.length();
+ }
+
+ FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
+ LOG.info("Retrieve File Chunk: " + chunk);
+ return chunk;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
new file mode 100644
index 0000000..8b9219c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+
+public interface ResourceAllocator {
+ public void allocateTaskWorker();
+ public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerId);
+ public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext,
+ int numTasks, int memoryMBPerTask);
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
new file mode 100644
index 0000000..1731854
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.StorageManagerFactory;
+
+import java.io.IOException;
+
+public class TajoQueryEngine {
+
+ private final AbstractStorageManager storageManager;
+ private final PhysicalPlanner phyPlanner;
+
+ public TajoQueryEngine(TajoConf conf) throws IOException {
+ this.storageManager = StorageManagerFactory.getStorageManager(conf);
+ this.phyPlanner = new PhysicalPlannerImpl(conf, storageManager);
+ }
+
+ public PhysicalExec createPlan(TaskAttemptContext ctx, LogicalNode plan)
+ throws InternalException {
+ return phyPlanner.createPlan(ctx, plan);
+ }
+
+ public void stop() throws IOException {
+ }
+}