You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 11:19:42 UTC
[20/51] [partial] TAJO-752: Escalate sub modules in tajo-core into
the top-level modules. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEventType.java
new file mode 100644
index 0000000..64c6fc6
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEventType.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+public enum SessionEventType {
+ EXPIRE,
+ PING
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java
new file mode 100644
index 0000000..483920f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tajo.conf.TajoConf;
+
+public class SessionLivelinessMonitor extends AbstractLivelinessMonitor<String> {
+
+ private EventHandler dispatcher;
+
+ public SessionLivelinessMonitor(Dispatcher d) {
+ super(SessionLivelinessMonitor.class.getSimpleName(), new SystemClock());
+ this.dispatcher = d.getEventHandler();
+ }
+
+ public void serviceInit(Configuration conf) throws Exception {
+ Preconditions.checkArgument(conf instanceof TajoConf);
+ TajoConf systemConf = (TajoConf) conf;
+
+ // seconds
+ int expireIntvl = systemConf.getIntVar(TajoConf.ConfVars.CLIENT_SESSION_EXPIRY_TIME);
+ setExpireInterval(expireIntvl);
+ setMonitorInterval(expireIntvl / 3);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void expire(String id) {
+ dispatcher.handle(new SessionEvent(id, SessionEventType.EXPIRE));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java
new file mode 100644
index 0000000..24df9d8
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SessionManager extends CompositeService implements EventHandler<SessionEvent> {
+ private static final Log LOG = LogFactory.getLog(SessionManager.class);
+
+ public final ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>();
+ private final Dispatcher dispatcher;
+ private SessionLivelinessMonitor sessionLivelinessMonitor;
+
+
+ public SessionManager(Dispatcher dispatcher) {
+ super(SessionManager.class.getSimpleName());
+ this.dispatcher = dispatcher;
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ sessionLivelinessMonitor = new SessionLivelinessMonitor(dispatcher);
+ addIfService(sessionLivelinessMonitor);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
+ private void assertSessionExistence(String sessionId) throws InvalidSessionException {
+ if (!sessions.containsKey(sessionId)) {
+ throw new InvalidSessionException(sessionId);
+ }
+ }
+
+ public String createSession(String username, String baseDatabaseName) throws InvalidSessionException {
+ String sessionId;
+ Session oldSession;
+
+ sessionId = UUID.randomUUID().toString();
+ Session newSession = new Session(sessionId, username, baseDatabaseName);
+ oldSession = sessions.putIfAbsent(sessionId, newSession);
+ if (oldSession != null) {
+ throw new InvalidSessionException("Session id is duplicated: " + oldSession.getSessionId());
+ }
+ LOG.info("Session " + sessionId + " is created." );
+ return sessionId;
+ }
+
+ public void removeSession(String sessionId) {
+ if (sessions.containsKey(sessionId)) {
+ sessions.remove(sessionId);
+ LOG.info("Session " + sessionId + " is removed.");
+ } else {
+ LOG.error("No such session id: " + sessionId);
+ }
+ }
+
+ public Session getSession(String sessionId) throws InvalidSessionException {
+ assertSessionExistence(sessionId);
+ touch(sessionId);
+ return sessions.get(sessionId);
+ }
+
+ public void setVariable(String sessionId, String name, String value) throws InvalidSessionException {
+ assertSessionExistence(sessionId);
+ touch(sessionId);
+ sessions.get(sessionId).setVariable(name, value);
+ }
+
+ public String getVariable(String sessionId, String name)
+ throws InvalidSessionException, NoSuchSessionVariableException {
+ assertSessionExistence(sessionId);
+ touch(sessionId);
+ return sessions.get(sessionId).getVariable(name);
+ }
+
+ public void removeVariable(String sessionId, String name) throws InvalidSessionException {
+ assertSessionExistence(sessionId);
+ touch(sessionId);
+ sessions.get(sessionId).removeVariable(name);
+ }
+
+ public Map<String, String> getAllVariables(String sessionId) throws InvalidSessionException {
+ assertSessionExistence(sessionId);
+ touch(sessionId);
+ return sessions.get(sessionId).getAllVariables();
+ }
+
+ public void touch(String sessionId) throws InvalidSessionException {
+ assertSessionExistence(sessionId);
+ sessions.get(sessionId).updateLastAccessTime();
+ sessionLivelinessMonitor.receivedPing(sessionId);
+ }
+
+ @Override
+ public void handle(SessionEvent event) {
+ LOG.info("Processing " + event.getSessionId() + " of type " + event.getType());
+
+ try {
+ assertSessionExistence(event.getSessionId());
+ touch(event.getSessionId());
+ } catch (InvalidSessionException e) {
+ LOG.error(e);
+ }
+
+ if (event.getType() == SessionEventType.EXPIRE) {
+ Session session = sessions.remove(event.getSessionId());
+ LOG.info("[Expired] Session username=" + session.getUserName() + ",sessionid=" + event.getSessionId());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/net/CachedDNSResolver.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/net/CachedDNSResolver.java b/tajo-core/src/main/java/org/apache/tajo/net/CachedDNSResolver.java
new file mode 100644
index 0000000..2a53c47
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/net/CachedDNSResolver.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.net;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CachedDNSResolver {
+ private static Map<String, String> hostNameToIPAddrMap
+ = new ConcurrentHashMap<String, String>();
+
+ private static CachedDNSResolver instance;
+
+ static {
+ instance = new CachedDNSResolver();
+ }
+
+ public static String resolve(String hostName) {
+
+ if (hostNameToIPAddrMap.containsKey(hostName)) {
+ return hostNameToIPAddrMap.get(hostName);
+ }
+
+ String ipAddress = null;
+ try {
+ ipAddress = InetAddress.getByName(hostName).getHostAddress();
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+ hostNameToIPAddrMap.put(hostName, ipAddress);
+
+ return ipAddress;
+ }
+
+ public static String [] resolve(String [] hostNames) {
+ if (hostNames == null) {
+ return null;
+ }
+
+ String [] resolved = new String[hostNames.length];
+ for (int i = 0; i < hostNames.length; i++) {
+ resolved[i] = resolve(hostNames[i]);
+ }
+ return resolved;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java b/tajo-core/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java
new file mode 100644
index 0000000..1db0c3b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+
+public class ApplicationIdUtils {
+ public static ApplicationAttemptId createApplicationAttemptId(QueryId queryId, int attemptId) {
+ return BuilderUtils.newApplicationAttemptId(queryIdToAppId(queryId), attemptId);
+ }
+
+ public static ApplicationAttemptId createApplicationAttemptId(QueryId queryId) {
+ return BuilderUtils.newApplicationAttemptId(queryIdToAppId(queryId), 1);
+ }
+
+ public static ApplicationId queryIdToAppId(QueryId queryId) {
+ return BuilderUtils.newApplicationId(Long.parseLong(queryId.getId()), queryId.getSeq());
+ }
+
+ public static QueryId appIdToQueryId(YarnProtos.ApplicationIdProto appId) {
+ return QueryIdFactory.newQueryId(appId.getClusterTimestamp(), appId.getId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/ClassUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/ClassUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/ClassUtil.java
new file mode 100644
index 0000000..160b585
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/ClassUtil.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.File;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+
+public abstract class ClassUtil {
+ private static final Log LOG = LogFactory.getLog(ClassUtil.class);
+
+ public static Set<Class> findClasses(Class type, String packageFilter) {
+ Set<Class> classSet = new HashSet<Class>();
+
+ String classpath = System.getProperty("java.class.path");
+ String[] paths = classpath.split(System.getProperty("path.separator"));
+
+ for (String path : paths) {
+ File file = new File(path);
+ if (file.exists()) {
+ findClasses(classSet, file, file, true, type, packageFilter);
+ }
+ }
+
+ return classSet;
+ }
+
+ private static void findClasses(Set<Class> matchedClassSet, File root, File file, boolean includeJars, Class type, String packageFilter) {
+ if (file.isDirectory()) {
+ for (File child : file.listFiles()) {
+ findClasses(matchedClassSet, root, child, includeJars, type, packageFilter);
+ }
+ } else {
+ if (file.getName().toLowerCase().endsWith(".jar") && includeJars) {
+ JarFile jar = null;
+ try {
+ jar = new JarFile(file);
+ } catch (Exception ex) {
+ LOG.error(ex.getMessage(), ex);
+ return;
+ }
+ Enumeration<JarEntry> entries = jar.entries();
+ while (entries.hasMoreElements()) {
+ JarEntry entry = entries.nextElement();
+ String name = entry.getName();
+ int extIndex = name.lastIndexOf(".class");
+ if (extIndex > 0) {
+ String qualifiedClassName = name.substring(0, extIndex).replace("/", ".");
+ if (qualifiedClassName.indexOf(packageFilter) >= 0 && !isTestClass(qualifiedClassName)) {
+ try {
+ Class clazz = Class.forName(qualifiedClassName);
+
+ if (!clazz.isInterface() && isMatch(type, clazz)) {
+ matchedClassSet.add(clazz);
+ }
+ } catch (ClassNotFoundException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ }
+ }
+ } else if (file.getName().toLowerCase().endsWith(".class")) {
+ String qualifiedClassName = createClassName(root, file);
+ if (qualifiedClassName.indexOf(packageFilter) >= 0 && !isTestClass(qualifiedClassName)) {
+ try {
+ Class clazz = Class.forName(qualifiedClassName);
+ if (!clazz.isInterface() && isMatch(type, clazz)) {
+ matchedClassSet.add(clazz);
+ }
+ } catch (ClassNotFoundException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ }
+ }
+ }
+
+ private static boolean isTestClass(String qualifiedClassName) {
+ String className = getClassName(qualifiedClassName);
+ if(className == null) {
+ return false;
+ }
+
+ return className.startsWith("Test");
+ }
+
+ private static boolean isMatch(Class targetClass, Class loadedClass) {
+ if (targetClass.equals(loadedClass)) {
+ return true;
+ }
+
+ Class[] classInterfaces = loadedClass.getInterfaces();
+ if (classInterfaces != null) {
+ for (Class eachInterfaceClass : classInterfaces) {
+ if (eachInterfaceClass.equals(targetClass)) {
+ return true;
+ }
+
+ if (isMatch(targetClass, eachInterfaceClass)) {
+ return true;
+ }
+ }
+ }
+
+ Class superClass = loadedClass.getSuperclass();
+ if (superClass != null) {
+ if (isMatch(targetClass, superClass)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static String getClassName(String qualifiedClassName) {
+ String[] tokens = qualifiedClassName.split("\\.");
+ if (tokens.length == 0) {
+ return qualifiedClassName;
+ }
+ return tokens[tokens.length - 1];
+ }
+
+ private static String createClassName(File root, File file) {
+ StringBuffer sb = new StringBuffer();
+ String fileName = file.getName();
+ sb.append(fileName.substring(0, fileName.lastIndexOf(".class")));
+ file = file.getParentFile();
+ while (file != null && !file.equals(root)) {
+ sb.insert(0, '.').insert(0, file.getName());
+ file = file.getParentFile();
+ }
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/GeoIPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/GeoIPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/GeoIPUtil.java
new file mode 100644
index 0000000..859b37d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/GeoIPUtil.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import com.maxmind.geoip.LookupService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+
+import java.io.IOException;
+
+public class GeoIPUtil {
+ private static final Log LOG = LogFactory.getLog(GeoIPUtil.class);
+ private static LookupService lookup;
+
+ static {
+ try {
+ TajoConf conf = new TajoConf();
+ lookup = new LookupService(conf.getVar(ConfVars.GEOIP_DATA),
+ LookupService.GEOIP_MEMORY_CACHE);
+ } catch (IOException e) {
+ LOG.error("Cannot open the geoip data", e);
+ }
+ }
+
+ public static String getCountryCode(String host) {
+ return lookup.getCountry(host).getCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
new file mode 100644
index 0000000..8816f8f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import com.google.gson.Gson;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.logical.IndexScanNode;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.storage.fragment.FileFragment;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+
+public class IndexUtil {
+ public static String getIndexNameOfFrag(FileFragment fragment, SortSpec[] keys) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(fragment.getPath().getName() + "_");
+ builder.append(fragment.getStartKey() + "_" + fragment.getEndKey() + "_");
+ for(int i = 0 ; i < keys.length ; i ++) {
+ builder.append(keys[i].getSortKey().getSimpleName()+"_");
+ }
+ builder.append("_index");
+ return builder.toString();
+
+ }
+
+ public static String getIndexName(String indexName , SortSpec[] keys) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(indexName + "_");
+ for(int i = 0 ; i < keys.length ; i ++) {
+ builder.append(keys[i].getSortKey().getSimpleName() + "_");
+ }
+ return builder.toString();
+ }
+
+ public static IndexScanNode indexEval(LogicalPlan plan, ScanNode scanNode,
+ Iterator<Entry<String, String>> iter ) {
+
+ EvalNode qual = scanNode.getQual();
+ Gson gson = CoreGsonHelper.getInstance();
+
+ FieldAndValueFinder nodeFinder = new FieldAndValueFinder();
+ qual.preOrder(nodeFinder);
+ LinkedList<EvalNode> nodeList = nodeFinder.getNodeList();
+
+ int maxSize = Integer.MIN_VALUE;
+ SortSpec[] maxIndex = null;
+
+ String json;
+ while(iter.hasNext()) {
+ Entry<String , String> entry = iter.next();
+ json = entry.getValue();
+ SortSpec[] sortKey = gson.fromJson(json, SortSpec[].class);
+ if(sortKey.length > nodeList.size()) {
+ /* If the number of the sort key is greater than where condition,
+ * this index cannot be used
+ * */
+ continue;
+ } else {
+ boolean[] equal = new boolean[sortKey.length];
+ for(int i = 0 ; i < sortKey.length ; i ++) {
+ for(int j = 0 ; j < nodeList.size() ; j ++) {
+ Column col = ((FieldEval)(nodeList.get(j).getLeftExpr())).getColumnRef();
+ if(col.equals(sortKey[i].getSortKey())) {
+ equal[i] = true;
+ }
+ }
+ }
+ boolean chk = true;
+ for(int i = 0 ; i < equal.length ; i ++) {
+ chk = chk && equal[i];
+ }
+ if(chk) {
+ if(maxSize < sortKey.length) {
+ maxSize = sortKey.length;
+ maxIndex = sortKey;
+ }
+ }
+ }
+ }
+ if(maxIndex == null) {
+ return null;
+ } else {
+ Schema keySchema = new Schema();
+ for(int i = 0 ; i < maxIndex.length ; i ++ ) {
+ keySchema.addColumn(maxIndex[i].getSortKey());
+ }
+ Datum[] datum = new Datum[nodeList.size()];
+ for(int i = 0 ; i < nodeList.size() ; i ++ ) {
+ datum[i] = ((ConstEval)(nodeList.get(i).getRightExpr())).getValue();
+ }
+
+ return new IndexScanNode(plan.newPID(), scanNode, keySchema , datum , maxIndex);
+ }
+
+ }
+
+
+ private static class FieldAndValueFinder implements EvalNodeVisitor {
+ private LinkedList<EvalNode> nodeList = new LinkedList<EvalNode>();
+
+ public LinkedList<EvalNode> getNodeList () {
+ return this.nodeList;
+ }
+
+ @Override
+ public void visit(EvalNode node) {
+ switch(node.getType()) {
+ case AND:
+ break;
+ case EQUAL:
+ if( node.getLeftExpr().getType() == EvalType.FIELD
+ && node.getRightExpr().getType() == EvalType.CONST ) {
+ nodeList.add(node);
+ }
+ break;
+ case IS_NULL:
+ if( node.getLeftExpr().getType() == EvalType.FIELD
+ && node.getRightExpr().getType() == EvalType.CONST) {
+ nodeList.add(node);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
new file mode 100644
index 0000000..58a3550
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.querymaster.QueryInProgress;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.worker.TaskRunner;
+
+import java.text.DecimalFormat;
+import java.util.*;
+
+import static org.apache.tajo.conf.TajoConf.ConfVars;
+
+public class JSPUtil {
+ static DecimalFormat decimalF = new DecimalFormat("###.0");
+
+ public static void sortQueryUnit(QueryUnit[] queryUnits, String sortField, String sortOrder) {
+ if(sortField == null || sortField.isEmpty()) {
+ sortField = "id";
+ }
+
+ Arrays.sort(queryUnits, new QueryUnitComparator(sortField, "asc".equals(sortOrder)));
+ }
+
+ public static void sortTaskRunner(List<TaskRunner> taskRunners) {
+ Collections.sort(taskRunners, new Comparator<TaskRunner>() {
+ @Override
+ public int compare(TaskRunner taskRunner, TaskRunner taskRunner2) {
+ return taskRunner.getId().compareTo(taskRunner2.getId());
+ }
+ });
+ }
+
+ public static String getElapsedTime(long startTime, long finishTime) {
+ if(startTime == 0) {
+ return "-";
+ }
+ return finishTime == 0 ? decimalF.format((System.currentTimeMillis() - startTime) / 1000) + " sec"
+ : decimalF.format((finishTime - startTime) / 1000) + " sec";
+ }
+
+ public static String getTajoMasterHttpAddr(Configuration config) {
+ try {
+ TajoConf conf = (TajoConf) config;
+ String [] masterAddr = conf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS).split(":");
+ return masterAddr[0] + ":" + conf.getVar(ConfVars.TAJO_MASTER_INFO_ADDRESS).split(":")[1];
+ } catch (Exception e) {
+ e.printStackTrace();
+ return e.getMessage();
+ }
+ }
+
+ public static List<QueryMasterTask> sortQueryMasterTask(Collection<QueryMasterTask> queryMasterTasks,
+ final boolean desc) {
+ List<QueryMasterTask> queryMasterTaskList = new ArrayList<QueryMasterTask>(queryMasterTasks);
+
+ Collections.sort(queryMasterTaskList, new Comparator<QueryMasterTask>() {
+
+ @Override
+ public int compare(QueryMasterTask task1, QueryMasterTask task2) {
+ if(desc) {
+ return task2.getQueryId().toString().compareTo(task1.getQueryId().toString());
+ } else {
+ return task1.getQueryId().toString().compareTo(task2.getQueryId().toString());
+ }
+ }
+ });
+
+ return queryMasterTaskList;
+ }
+
+ public static List<QueryInProgress> sortQueryInProgress(Collection<QueryInProgress> queryInProgresses,
+ final boolean desc) {
+ List<QueryInProgress> queryProgressList = new ArrayList<QueryInProgress>(queryInProgresses);
+
+ Collections.sort(queryProgressList, new Comparator<QueryInProgress>() {
+ @Override
+ public int compare(QueryInProgress query1, QueryInProgress query2) {
+ if(desc) {
+ return query2.getQueryId().toString().compareTo(query1.getQueryId().toString());
+ } else {
+ return query1.getQueryId().toString().compareTo(query2.getQueryId().toString());
+ }
+ }
+ });
+
+ return queryProgressList;
+ }
+
+ public static List<SubQuery> sortSubQuery(Collection<SubQuery> subQueries) {
+ List<SubQuery> subQueryList = new ArrayList<SubQuery>(subQueries);
+ Collections.sort(subQueryList, new Comparator<SubQuery>() {
+ @Override
+ public int compare(SubQuery subQuery1, SubQuery subQuery2) {
+ long q1StartTime = subQuery1.getStartTime();
+ long q2StartTime = subQuery2.getStartTime();
+
+ q1StartTime = (q1StartTime == 0 ? Long.MAX_VALUE : q1StartTime);
+ q2StartTime = (q2StartTime == 0 ? Long.MAX_VALUE : q2StartTime);
+
+ int result = compareLong(q1StartTime, q2StartTime);
+ if (result == 0) {
+ return subQuery1.getId().toString().compareTo(subQuery2.getId().toString());
+ } else {
+ return result;
+ }
+ }
+ });
+
+ return subQueryList;
+ }
+
+ static class QueryUnitComparator implements Comparator<QueryUnit> {
+ private String sortField;
+ private boolean asc;
+ public QueryUnitComparator(String sortField, boolean asc) {
+ this.sortField = sortField;
+ this.asc = asc;
+ }
+
+ @Override
+ public int compare(QueryUnit queryUnit, QueryUnit queryUnit2) {
+ if(asc) {
+ if("id".equals(sortField)) {
+ return queryUnit.getId().compareTo(queryUnit2.getId());
+ } else if("host".equals(sortField)) {
+ String host1 = queryUnit.getSucceededHost() == null ? "-" : queryUnit.getSucceededHost();
+ String host2 = queryUnit2.getSucceededHost() == null ? "-" : queryUnit2.getSucceededHost();
+ return host1.compareTo(host2);
+ } else if("runTime".equals(sortField)) {
+ return compareLong(queryUnit.getRunningTime(), queryUnit2.getRunningTime());
+ } else if("startTime".equals(sortField)) {
+ return compareLong(queryUnit.getLaunchTime(), queryUnit2.getLaunchTime());
+ } else {
+ return queryUnit.getId().compareTo(queryUnit2.getId());
+ }
+ } else {
+ if("id".equals(sortField)) {
+ return queryUnit2.getId().compareTo(queryUnit.getId());
+ } else if("host".equals(sortField)) {
+ String host1 = queryUnit.getSucceededHost() == null ? "-" : queryUnit.getSucceededHost();
+ String host2 = queryUnit2.getSucceededHost() == null ? "-" : queryUnit2.getSucceededHost();
+ return host2.compareTo(host1);
+ } else if("runTime".equals(sortField)) {
+ if(queryUnit2.getLaunchTime() == 0) {
+ return -1;
+ } else if(queryUnit.getLaunchTime() == 0) {
+ return 1;
+ }
+ return compareLong(queryUnit2.getRunningTime(), queryUnit.getRunningTime());
+ } else if("startTime".equals(sortField)) {
+ return compareLong(queryUnit2.getLaunchTime(), queryUnit.getLaunchTime());
+ } else {
+ return queryUnit2.getId().compareTo(queryUnit.getId());
+ }
+ }
+ }
+ }
+
+ static int compareLong(long a, long b) {
+ if(a > b) {
+ return 1;
+ } else if(a < b) {
+ return -1;
+ } else {
+ return 0;
+ }
+ }
+
+ public static void sortFunctionDesc(List<FunctionDesc> functions) {
+ Collections.sort(functions, new java.util.Comparator<FunctionDesc>() {
+ @Override
+ public int compare(FunctionDesc f1, FunctionDesc f2) {
+ int nameCompared = f1.getSignature().compareTo(f2.getSignature());
+ if(nameCompared != 0) {
+ return nameCompared;
+ } else {
+ return f1.getReturnType().getType().compareTo(f2.getReturnType().getType());
+ }
+ }
+ });
+ }
+
+ static final DecimalFormat PERCENT_FORMAT = new DecimalFormat("###.0");
+ public static String percentFormat(float value) {
+ return PERCENT_FORMAT.format(value * 100.0f);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/GroupNameMetricsFilter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/GroupNameMetricsFilter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/GroupNameMetricsFilter.java
new file mode 100644
index 0000000..a273475
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/GroupNameMetricsFilter.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+
+public class GroupNameMetricsFilter implements MetricFilter {
+ String groupName;
+
+ public GroupNameMetricsFilter(String groupName) {
+ this.groupName = groupName;
+ }
+ @Override
+ public boolean matches(String name, Metric metric) {
+ if(name != null) {
+ String[] tokens = name.split("\\.");
+ if(groupName.equals(tokens[0])) {
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/LogEventGaugeSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/LogEventGaugeSet.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/LogEventGaugeSet.java
new file mode 100644
index 0000000..6e130ff
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/LogEventGaugeSet.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class LogEventGaugeSet implements MetricSet {
+
+ @Override
+ public Map<String, Metric> getMetrics() {
+ final Map<String, Metric> gauges = new HashMap<String, Metric>();
+
+ gauges.put("Fatal", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return TajoLogEventCounter.getFatal();
+ }
+ });
+
+ gauges.put("Error", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return TajoLogEventCounter.getError();
+ }
+ });
+
+ gauges.put("Warn", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return TajoLogEventCounter.getWarn();
+ }
+ });
+
+ gauges.put("Info", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return TajoLogEventCounter.getInfo();
+ }
+ });
+
+ return gauges;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/MetricsFilterList.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/MetricsFilterList.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/MetricsFilterList.java
new file mode 100644
index 0000000..b2fc6e4
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/MetricsFilterList.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MetricsFilterList implements MetricFilter {
+ List<MetricFilter> filters = new ArrayList<MetricFilter>();
+
+ public void addMetricFilter(MetricFilter filter) {
+ filters.add(filter);
+ }
+
+ @Override
+ public boolean matches(String name, Metric metric) {
+ for (MetricFilter eachFilter: filters) {
+ if (!eachFilter.matches(name, metric)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/RegexpMetricsFilter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/RegexpMetricsFilter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/RegexpMetricsFilter.java
new file mode 100644
index 0000000..4faa3e7
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/RegexpMetricsFilter.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class RegexpMetricsFilter implements MetricFilter {
+ List<Pattern> filterPatterns = new ArrayList<Pattern>();
+
+ public RegexpMetricsFilter(Collection<String> filterExpressions) {
+ for(String eachExpression: filterExpressions) {
+ filterPatterns.add(Pattern.compile(eachExpression));
+ }
+ }
+
+ @Override
+ public boolean matches(String name, Metric metric) {
+ if(filterPatterns.isEmpty()) {
+ return true;
+ }
+
+ for(Pattern eachPattern: filterPatterns) {
+ if(eachPattern.matcher(name).find()) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoLogEventCounter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoLogEventCounter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoLogEventCounter.java
new file mode 100644
index 0000000..3e44b02
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoLogEventCounter.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
+
+public class TajoLogEventCounter extends AppenderSkeleton {
+ private static final int FATAL = 0;
+ private static final int ERROR = 1;
+ private static final int WARN = 2;
+ private static final int INFO = 3;
+
+ private static class EventCounts {
+
+ private final long[] counts = {0, 0, 0, 0};
+
+ private synchronized void incr(int i) {
+ ++counts[i];
+ }
+
+ private synchronized long get(int i) {
+ return counts[i];
+ }
+ }
+
+ private static EventCounts counts = new EventCounts();
+
+ public static long getFatal() {
+ return counts.get(FATAL);
+ }
+
+ public static long getError() {
+ return counts.get(ERROR);
+ }
+
+ public static long getWarn() {
+ return counts.get(WARN);
+ }
+
+ public static long getInfo() {
+ return counts.get(INFO);
+ }
+
+ @Override
+ public void append(LoggingEvent event) {
+ Level level = event.getLevel();
+ String levelStr = level.toString();
+
+ if (level == Level.INFO || "INFO".equalsIgnoreCase(levelStr)) {
+ counts.incr(INFO);
+ } else if (level == Level.WARN || "WARN".equalsIgnoreCase(levelStr)) {
+ counts.incr(WARN);
+ } else if (level == Level.ERROR || "ERROR".equalsIgnoreCase(levelStr)) {
+ counts.incr(ERROR);
+ } else if (level == Level.FATAL || "FATAL".equalsIgnoreCase(levelStr)) {
+ counts.incr(FATAL);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public boolean requiresLayout() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoMetrics.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoMetrics.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoMetrics.java
new file mode 100644
index 0000000..0e378b2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoMetrics.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.metrics.reporter.TajoMetricsReporter;
+
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TajoMetrics {
+ private static final Log LOG = LogFactory.getLog(TajoMetrics.class);
+
+ protected MetricRegistry metricRegistry;
+ protected AtomicBoolean stop = new AtomicBoolean(false);
+ protected String metricsGroupName;
+
+ public TajoMetrics(String metricsGroupName) {
+ this.metricsGroupName = metricsGroupName;
+ this.metricRegistry = new MetricRegistry();
+ }
+
+ public void stop() {
+ stop.set(true);
+ }
+
+ public MetricRegistry getRegistry() {
+ return metricRegistry;
+ }
+
+ public void report(TajoMetricsReporter reporter) {
+ try {
+ reporter.report(metricRegistry.getGauges(),
+ metricRegistry.getCounters(),
+ metricRegistry.getHistograms(),
+ metricRegistry.getMeters(),
+ metricRegistry.getTimers());
+ } catch (Exception e) {
+ if(LOG.isDebugEnabled()) {
+ LOG.warn("Metric report error:" + e.getMessage(), e);
+ } else {
+ LOG.warn("Metric report error:" + e.getMessage());
+ }
+ }
+ }
+
+ public Map<String, Metric> getMetrics() {
+ return metricRegistry.getMetrics();
+ }
+
+ public SortedMap<String, Gauge> getGuageMetrics(MetricFilter filter) {
+ if(filter == null) {
+ filter = MetricFilter.ALL;
+ }
+ return metricRegistry.getGauges(filter);
+ }
+
+ public SortedMap<String, Counter> getCounterMetrics(MetricFilter filter) {
+ if(filter == null) {
+ filter = MetricFilter.ALL;
+ }
+ return metricRegistry.getCounters(filter);
+ }
+
+ public SortedMap<String, Histogram> getHistogramMetrics(MetricFilter filter) {
+ if(filter == null) {
+ filter = MetricFilter.ALL;
+ }
+ return metricRegistry.getHistograms(filter);
+ }
+
+ public SortedMap<String, Meter> getMeterMetrics(MetricFilter filter) {
+ if(filter == null) {
+ filter = MetricFilter.ALL;
+ }
+ return metricRegistry.getMeters(filter);
+ }
+
+ public SortedMap<String, Timer> getTimerMetrics(MetricFilter filter) {
+ if(filter == null) {
+ filter = MetricFilter.ALL;
+ }
+ return metricRegistry.getTimers(filter);
+ }
+
+ public void register(String contextName, MetricSet metricSet) {
+ metricRegistry.register(MetricRegistry.name(metricsGroupName, contextName), metricSet);
+ }
+
+ public void register(String contextName, String itemName, Gauge gauge) {
+ metricRegistry.register(makeMetricsName(metricsGroupName, contextName, itemName), gauge);
+ }
+
+ public Counter counter(String contextName, String itemName) {
+ return metricRegistry.counter(makeMetricsName(metricsGroupName, contextName, itemName));
+ }
+
+ public Histogram histogram(String contextName, String itemName) {
+ return metricRegistry.histogram(makeMetricsName(metricsGroupName, contextName, itemName));
+ }
+
+ public Meter meter(String contextName, String itemName) {
+ return metricRegistry.meter(makeMetricsName(metricsGroupName, contextName, itemName));
+ }
+
+ public Timer timer(String contextName, String itemName) {
+ return metricRegistry.timer(makeMetricsName(metricsGroupName, contextName, itemName));
+ }
+
+ public static String makeMetricsName(String metricsGroupName, String contextName, String itemName) {
+ return MetricRegistry.name(metricsGroupName, contextName, itemName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoSystemMetrics.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoSystemMetrics.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoSystemMetrics.java
new file mode 100644
index 0000000..4192ca0
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoSystemMetrics.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.jvm.FileDescriptorRatioGauge;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.configuration.event.ConfigurationEvent;
+import org.apache.commons.configuration.event.ConfigurationListener;
+import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.metrics.reporter.TajoMetricsScheduledReporter;
+
+import java.util.*;
+
+public class TajoSystemMetrics extends TajoMetrics {
+ private static final Log LOG = LogFactory.getLog(TajoSystemMetrics.class);
+
+ private PropertiesConfiguration metricsProps;
+
+ private Thread propertyChangeChecker;
+
+ private String hostAndPort;
+
+ private List<TajoMetricsScheduledReporter> metricsReporters = new ArrayList<TajoMetricsScheduledReporter>();
+
+ private boolean inited = false;
+
+ private String metricsPropertyFileName;
+
+ public TajoSystemMetrics(TajoConf tajoConf, String metricsGroupName, String hostAndPort) {
+ super(metricsGroupName);
+
+ this.hostAndPort = hostAndPort;
+ try {
+ this.metricsPropertyFileName = tajoConf.getVar(TajoConf.ConfVars.METRICS_PROPERTY_FILENAME);
+ this.metricsProps = new PropertiesConfiguration(metricsPropertyFileName);
+ this.metricsProps.addConfigurationListener(new MetricsReloadListener());
+ FileChangedReloadingStrategy reloadingStrategy = new FileChangedReloadingStrategy();
+ reloadingStrategy.setRefreshDelay(5 * 1000);
+ this.metricsProps.setReloadingStrategy(reloadingStrategy);
+ } catch (ConfigurationException e) {
+ LOG.warn(e.getMessage(), e);
+ }
+
+ //PropertiesConfiguration fire configurationChanged after getXXX()
+ //So neeaded calling getXXX periodically
+ propertyChangeChecker = new Thread() {
+ public void run() {
+ while(!stop.get()) {
+ String value = metricsProps.getString("reporter.file");
+ try {
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ };
+
+ propertyChangeChecker.start();
+ }
+
+ public Collection<TajoMetricsScheduledReporter> getMetricsReporters() {
+ synchronized (metricsReporters) {
+ return Collections.unmodifiableCollection(metricsReporters);
+ }
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+ if(propertyChangeChecker != null) {
+ propertyChangeChecker.interrupt();
+ }
+ stopAndClearReporter();
+ }
+
+ protected void stopAndClearReporter() {
+ synchronized(metricsReporters) {
+ for(TajoMetricsScheduledReporter eachReporter: metricsReporters) {
+ eachReporter.close();
+ }
+
+ metricsReporters.clear();
+ }
+ }
+
+ public void start() {
+ setMetricsReporter(metricsGroupName);
+
+ String jvmMetricsName = metricsGroupName + "-jvm";
+ setMetricsReporter(jvmMetricsName);
+
+ if(!inited) {
+ metricRegistry.register(MetricRegistry.name(jvmMetricsName, "Heap"), new MemoryUsageGaugeSet());
+ metricRegistry.register(MetricRegistry.name(jvmMetricsName, "File"), new FileDescriptorRatioGauge());
+ metricRegistry.register(MetricRegistry.name(jvmMetricsName, "GC"), new GarbageCollectorMetricSet());
+ metricRegistry.register(MetricRegistry.name(jvmMetricsName, "Thread"), new ThreadStatesGaugeSet());
+ metricRegistry.register(MetricRegistry.name(jvmMetricsName, "Log"), new LogEventGaugeSet());
+ }
+ inited = true;
+ }
+
+ private void setMetricsReporter(String groupName) {
+ //reporter name -> class name
+ Map<String, String> reporters = new HashMap<String, String>();
+
+ List<String> reporterNames = metricsProps.getList(groupName + ".reporters");
+ if(reporterNames.isEmpty()) {
+ LOG.warn("No property " + groupName + ".reporters in " + metricsPropertyFileName);
+ return;
+ }
+
+ Map<String, String> allReporterProperties = new HashMap<String, String>();
+
+ Iterator<String> keys = metricsProps.getKeys();
+ while (keys.hasNext()) {
+ String key = keys.next();
+ String value = metricsProps.getString(key);
+ if(key.indexOf("reporter.") == 0) {
+ String[] tokens = key.split("\\.");
+ if(tokens.length == 2) {
+ reporters.put(tokens[1], value);
+ }
+ } else if(key.indexOf(groupName + ".") == 0) {
+ String[] tokens = key.split("\\.");
+ if(tokens.length > 2) {
+ allReporterProperties.put(key, value);
+ }
+ }
+ }
+
+ synchronized(metricsReporters) {
+ for(String eachReporterName: reporterNames) {
+ if("null".equals(eachReporterName)) {
+ continue;
+ }
+ String reporterClass = reporters.get(eachReporterName);
+ if(reporterClass == null) {
+ LOG.warn("No metrics reporter definition[" + eachReporterName + "] in " + metricsPropertyFileName);
+ continue;
+ }
+
+ Map<String, String> eachMetricsReporterProperties = findMetircsProperties(allReporterProperties,
+ groupName + "." + eachReporterName);
+
+ try {
+ Object reporterObject = Class.forName(reporterClass).newInstance();
+ if(!(reporterObject instanceof TajoMetricsScheduledReporter)) {
+ LOG.warn(reporterClass + " is not subclass of " + TajoMetricsScheduledReporter.class.getCanonicalName());
+ continue;
+ }
+ TajoMetricsScheduledReporter reporter = (TajoMetricsScheduledReporter)reporterObject;
+ reporter.init(metricRegistry, groupName, hostAndPort, eachMetricsReporterProperties);
+ reporter.start();
+
+ metricsReporters.add(reporter);
+ LOG.info("Started metrics reporter " + reporter.getClass().getCanonicalName() + " for " + groupName);
+ } catch (ClassNotFoundException e) {
+ LOG.warn("No metrics reporter class[" + eachReporterName + "], required class= " + reporterClass);
+ continue;
+ } catch (Exception e) {
+ LOG.warn("Can't initiate metrics reporter class[" + eachReporterName + "]" + e.getMessage() , e);
+ continue;
+ }
+ }
+ }
+ }
+
+ private Map<String, String> findMetircsProperties(Map<String, String> allReporterProperties, String findKey) {
+ Map<String, String> metricsProperties = new HashMap<String, String>();
+
+ for (Map.Entry<String, String> entry: allReporterProperties.entrySet()) {
+ String eachKey = entry.getKey();
+ if (eachKey.indexOf(findKey) == 0) {
+ metricsProperties.put(eachKey, entry.getValue());
+ }
+ }
+ return metricsProperties;
+ }
+
+ class MetricsReloadListener implements ConfigurationListener {
+ @Override
+ public synchronized void configurationChanged(ConfigurationEvent event) {
+ if (!event.isBeforeUpdate()) {
+ stopAndClearReporter();
+ start();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/GangliaReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/GangliaReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/GangliaReporter.java
new file mode 100644
index 0000000..b9acf0e
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/GangliaReporter.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.*;
+import info.ganglia.gmetric4j.gmetric.GMetric;
+import info.ganglia.gmetric4j.gmetric.GMetricSlope;
+import info.ganglia.gmetric4j.gmetric.GMetricType;
+import info.ganglia.gmetric4j.gmetric.GangliaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.SortedMap;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+public class GangliaReporter extends TajoMetricsScheduledReporter {
+ private static final Logger LOG = LoggerFactory.getLogger(GangliaReporter.class);
+ public static final String REPORTER_NAME = "ganglia";
+
+ private GMetric ganglia;
+ private String prefix;
+ private int tMax = 60;
+ private int dMax = 0;
+
+ @Override
+ protected String getReporterName() {
+ return REPORTER_NAME;
+ }
+
+ @Override
+ protected void afterInit() {
+ String server = metricsProperties.get(metricsPropertyKey + "server");
+ String port = metricsProperties.get(metricsPropertyKey + "port");
+
+ if(server == null || server.isEmpty()) {
+ LOG.warn("No " + metricsPropertyKey + "server property in tajo-metrics.properties");
+ return;
+ }
+
+ if(port == null || port.isEmpty()) {
+ LOG.warn("No " + metricsPropertyKey + "port property in tajo-metrics.properties");
+ return;
+ }
+
+ try {
+ ganglia = new GMetric(server, Integer.parseInt(port), GMetric.UDPAddressingMode.MULTICAST, 1);
+ } catch (Exception e) {
+ LOG.warn(e.getMessage(), e);
+ }
+ }
+
+ public void setPrefix(String prefix) {
+ this.prefix = prefix;
+ }
+
+ public void settMax(int tMax) {
+ this.tMax = tMax;
+ }
+
+ public void setdMax(int dMax) {
+ this.dMax = dMax;
+ }
+
+ @Override
+ public void report(SortedMap<String, Gauge> gauges,
+ SortedMap<String, Counter> counters,
+ SortedMap<String, Histogram> histograms,
+ SortedMap<String, Meter> meters,
+ SortedMap<String, Timer> timers) {
+ for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
+ reportGauge(entry.getKey(), entry.getValue());
+ }
+
+ for (Map.Entry<String, Counter> entry : counters.entrySet()) {
+ reportCounter(entry.getKey(), entry.getValue());
+ }
+
+ for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
+ reportHistogram(entry.getKey(), entry.getValue());
+ }
+
+ for (Map.Entry<String, Meter> entry : meters.entrySet()) {
+ reportMeter(entry.getKey(), entry.getValue());
+ }
+
+ for (Map.Entry<String, Timer> entry : timers.entrySet()) {
+ reportTimer(entry.getKey(), entry.getValue());
+ }
+ }
+
+ private void reportTimer(String name, Timer timer) {
+ final String group = group(name);
+ try {
+ final Snapshot snapshot = timer.getSnapshot();
+
+ announce(prefix(name, "max"), group, convertDuration(snapshot.getMax()), getDurationUnit());
+ announce(prefix(name, "mean"), group, convertDuration(snapshot.getMean()), getDurationUnit());
+ announce(prefix(name, "min"), group, convertDuration(snapshot.getMin()), getDurationUnit());
+ announce(prefix(name, "stddev"), group, convertDuration(snapshot.getStdDev()), getDurationUnit());
+
+ announce(prefix(name, "p50"), group, convertDuration(snapshot.getMedian()), getDurationUnit());
+ announce(prefix(name, "p75"),
+ group,
+ convertDuration(snapshot.get75thPercentile()),
+ getDurationUnit());
+ announce(prefix(name, "p95"),
+ group,
+ convertDuration(snapshot.get95thPercentile()),
+ getDurationUnit());
+ announce(prefix(name, "p98"),
+ group,
+ convertDuration(snapshot.get98thPercentile()),
+ getDurationUnit());
+ announce(prefix(name, "p99"),
+ group,
+ convertDuration(snapshot.get99thPercentile()),
+ getDurationUnit());
+ announce(prefix(name, "p999"),
+ group,
+ convertDuration(snapshot.get999thPercentile()),
+ getDurationUnit());
+
+ reportMetered(name, timer, group, "calls");
+ } catch (GangliaException e) {
+ LOG.warn("Unable to report timer {}", name, e);
+ }
+ }
+
+ private void reportMeter(String name, Meter meter) {
+ final String group = group(name);
+ try {
+ reportMetered(name, meter, group, "events");
+ } catch (GangliaException e) {
+ LOG.warn("Unable to report meter {}", name, e);
+ }
+ }
+
+ private void reportMetered(String name, Metered meter, String group, String eventName) throws GangliaException {
+ final String unit = eventName + '/' + getRateUnit();
+ announce(prefix(name, "count"), group, meter.getCount(), eventName);
+ announce(prefix(name, "m1_rate"), group, convertRate(meter.getOneMinuteRate()), unit);
+ announce(prefix(name, "m5_rate"), group, convertRate(meter.getFiveMinuteRate()), unit);
+ announce(prefix(name, "m15_rate"), group, convertRate(meter.getFifteenMinuteRate()), unit);
+ announce(prefix(name, "mean_rate"), group, convertRate(meter.getMeanRate()), unit);
+ }
+
+ private void reportHistogram(String name, Histogram histogram) {
+ final String group = group(name);
+ try {
+ final Snapshot snapshot = histogram.getSnapshot();
+
+ announce(prefix(name, "count"), group, histogram.getCount(), "");
+ announce(prefix(name, "max"), group, snapshot.getMax(), "");
+ announce(prefix(name, "mean"), group, snapshot.getMean(), "");
+ announce(prefix(name, "min"), group, snapshot.getMin(), "");
+ announce(prefix(name, "stddev"), group, snapshot.getStdDev(), "");
+ announce(prefix(name, "p50"), group, snapshot.getMedian(), "");
+ announce(prefix(name, "p75"), group, snapshot.get75thPercentile(), "");
+ announce(prefix(name, "p95"), group, snapshot.get95thPercentile(), "");
+ announce(prefix(name, "p98"), group, snapshot.get98thPercentile(), "");
+ announce(prefix(name, "p99"), group, snapshot.get99thPercentile(), "");
+ announce(prefix(name, "p999"), group, snapshot.get999thPercentile(), "");
+ } catch (GangliaException e) {
+ LOG.warn("Unable to report histogram {}", name, e);
+ }
+ }
+
+ private void reportCounter(String name, Counter counter) {
+ final String group = group(name);
+ try {
+ announce(prefix(name, "count"), group, counter.getCount(), "");
+ } catch (GangliaException e) {
+ LOG.warn("Unable to report counter {}", name, e);
+ }
+ }
+
+ private void reportGauge(String name, Gauge gauge) {
+ final String group = group(name);
+ final Object obj = gauge.getValue();
+
+ try {
+ ganglia.announce(name(prefix, name), String.valueOf(obj), detectType(obj), "",
+ GMetricSlope.BOTH, tMax, dMax, group);
+ } catch (GangliaException e) {
+ LOG.warn("Unable to report gauge {}", name, e);
+ }
+ }
+
+ private void announce(String name, String group, double value, String units) throws GangliaException {
+ ganglia.announce(name,
+ Double.toString(value),
+ GMetricType.DOUBLE,
+ units,
+ GMetricSlope.BOTH,
+ tMax,
+ dMax,
+ group);
+ }
+
+ private void announce(String name, String group, long value, String units) throws GangliaException {
+ final String v = Long.toString(value);
+ ganglia.announce(name,
+ v,
+ GMetricType.DOUBLE,
+ units,
+ GMetricSlope.BOTH,
+ tMax,
+ dMax,
+ group);
+ }
+
+ private GMetricType detectType(Object o) {
+ if (o instanceof Float) {
+ return GMetricType.FLOAT;
+ } else if (o instanceof Double) {
+ return GMetricType.DOUBLE;
+ } else if (o instanceof Byte) {
+ return GMetricType.INT8;
+ } else if (o instanceof Short) {
+ return GMetricType.INT16;
+ } else if (o instanceof Integer) {
+ return GMetricType.INT32;
+ } else if (o instanceof Long) {
+ return GMetricType.DOUBLE;
+ }
+ return GMetricType.STRING;
+ }
+
+ private String group(String name) {
+ String[] tokens = name.split("\\.");
+ if(tokens.length < 3) {
+ return "";
+ }
+ return tokens[0] + "." + tokens[1];
+ }
+
+ private String prefix(String name, String n) {
+ return name(prefix, name, n);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleReporter.java
new file mode 100644
index 0000000..80b77f1
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleReporter.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.*;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+
+public class MetricsConsoleReporter extends TajoMetricsReporter {
+ @Override
+ public void report(SortedMap<String, Gauge> gauges,
+ SortedMap<String, Counter> counters,
+ SortedMap<String, Histogram> histograms,
+ SortedMap<String, Meter> meters,
+ SortedMap<String, Timer> timers) {
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ final String dateTime = dateFormat.format(new Date());
+ double rateFactor = TimeUnit.SECONDS.toSeconds(1);
+
+ if (!gauges.isEmpty()) {
+ Map<String, Map<String, Gauge>> gaugeGroups = findMetricsItemGroup(gauges);
+
+ for(Map.Entry<String, Map<String, Gauge>> eachGroup: gaugeGroups.entrySet()) {
+ System.out.println(gaugeGroupToString(dateTime, null, rateFactor, eachGroup.getKey(), eachGroup.getValue()));
+ }
+ }
+
+ if (!counters.isEmpty()) {
+ Map<String, Map<String, Counter>> counterGroups = findMetricsItemGroup(counters);
+
+ for(Map.Entry<String, Map<String, Counter>> eachGroup: counterGroups.entrySet()) {
+ System.out.println(counterGroupToString(dateTime, null, rateFactor, eachGroup.getKey(), eachGroup.getValue()));
+ }
+ }
+
+ if (!histograms.isEmpty()) {
+ Map<String, Map<String, Histogram>> histogramGroups = findMetricsItemGroup(histograms);
+
+ for(Map.Entry<String, Map<String, Histogram>> eachGroup: histogramGroups.entrySet()) {
+ System.out.println(histogramGroupToString(dateTime, null, rateFactor, eachGroup.getKey(), eachGroup.getValue()));
+ }
+ }
+
+ if (!meters.isEmpty()) {
+ Map<String, Map<String, Meter>> meterGroups = findMetricsItemGroup(meters);
+
+ for(Map.Entry<String, Map<String, Meter>> eachGroup: meterGroups.entrySet()) {
+ System.out.println(meterGroupToString(dateTime, null, rateFactor, eachGroup.getKey(), eachGroup.getValue()));
+ }
+ }
+
+ if (!timers.isEmpty()) {
+ Map<String, Map<String, Timer>> timerGroups = findMetricsItemGroup(timers);
+
+ for(Map.Entry<String, Map<String, Timer>> eachGroup: timerGroups.entrySet()) {
+ System.out.println(timerGroupToString(dateTime, null, rateFactor, eachGroup.getKey(), eachGroup.getValue()));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleScheduledReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleScheduledReporter.java
new file mode 100644
index 0000000..286ef8d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleScheduledReporter.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+public class MetricsConsoleScheduledReporter extends MetricsStreamScheduledReporter {
+ public static final String REPORTER_NAME = "console";
+ @Override
+ protected String getReporterName() {
+ return REPORTER_NAME;
+ }
+
+ @Override
+ protected void afterInit() {
+ setOutput(System.out);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
new file mode 100644
index 0000000..9e895b8
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+
+public class MetricsFileScheduledReporter extends MetricsStreamScheduledReporter {
+ private static final Log LOG = LogFactory.getLog(MetricsFileScheduledReporter.class);
+ public static final String REPORTER_NAME = "file";
+
+ protected String getReporterName() {
+ return REPORTER_NAME;
+ }
+
+ @Override
+ protected void afterInit() {
+ String fileName = metricsProperties.get(metricsPropertyKey + "filename");
+ if(fileName == null) {
+ LOG.warn("No " + metricsPropertyKey + "filename property in tajo-metrics.properties");
+ return;
+ }
+ try {
+ File file = new File(fileName);
+ File parentFile = file.getParentFile();
+ if(parentFile != null && !parentFile.exists()) {
+ if(!parentFile.mkdirs()) {
+ LOG.warn("Can't create dir for tajo metrics:" + parentFile.getAbsolutePath());
+ }
+ }
+ this.setOutput(new FileOutputStream(fileName, true));
+ this.setDateFormat(null);
+ } catch (FileNotFoundException e) {
+ LOG.warn("Can't open metrics file:" + fileName);
+ this.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java
new file mode 100644
index 0000000..4fbefd7
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.*;
+import com.codahale.metrics.Timer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+public abstract class MetricsStreamScheduledReporter extends TajoMetricsScheduledReporter {
+ private static final Log LOG = LogFactory.getLog(MetricsStreamScheduledReporter.class);
+
+ protected OutputStream output;
+ protected Locale locale;
+ protected Clock clock;
+ protected TimeZone timeZone;
+ protected MetricFilter filter;
+ protected DateFormat dateFormat;
+
+ private final byte[] NEW_LINE = "\n".getBytes();
+
+ public MetricsStreamScheduledReporter() {
+ dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ clock = Clock.defaultClock();
+ }
+
+ public void setOutput(OutputStream output) {
+ this.output = output;
+ }
+
+ public void setLocale(Locale locale) {
+ this.locale = locale;
+ }
+
+ public void setClock(Clock clock) {
+ this.clock = clock;
+ }
+
+ public void setTimeZone(TimeZone timeZone) {
+ this.dateFormat.setTimeZone(timeZone);
+ this.timeZone = timeZone;
+ }
+
+ public void setDateFormat(DateFormat dateFormat) {
+ this.dateFormat = dateFormat;
+ }
+
+ @Override
+ public void report(SortedMap<String, Gauge> gauges,
+ SortedMap<String, Counter> counters,
+ SortedMap<String, Histogram> histograms,
+ SortedMap<String, Meter> meters,
+ SortedMap<String, Timer> timers) {
+ final String dateTime = dateFormat == null ? "" + clock.getTime() : dateFormat.format(new Date(clock.getTime()));
+
+ if (!gauges.isEmpty()) {
+ Map<String, Map<String, Gauge>> gaugeGroups = findMetricsItemGroup(gauges);
+
+ for(Map.Entry<String, Map<String, Gauge>> eachGroup: gaugeGroups.entrySet()) {
+ printGaugeGroup(dateTime, eachGroup.getKey(), eachGroup.getValue());
+ }
+ }
+
+ if (!counters.isEmpty()) {
+ Map<String, Map<String, Counter>> counterGroups = findMetricsItemGroup(counters);
+
+ for(Map.Entry<String, Map<String, Counter>> eachGroup: counterGroups.entrySet()) {
+ printCounterGroup(dateTime, eachGroup.getKey(), eachGroup.getValue());
+ }
+ }
+
+ if (!histograms.isEmpty()) {
+ Map<String, Map<String, Histogram>> histogramGroups = findMetricsItemGroup(histograms);
+
+ for(Map.Entry<String, Map<String, Histogram>> eachGroup: histogramGroups.entrySet()) {
+ printHistogramGroup(dateTime, eachGroup.getKey(), eachGroup.getValue());
+ }
+ }
+
+ if (!meters.isEmpty()) {
+ Map<String, Map<String, Meter>> meterGroups = findMetricsItemGroup(meters);
+
+ for(Map.Entry<String, Map<String, Meter>> eachGroup: meterGroups.entrySet()) {
+ printMeterGroup(dateTime, eachGroup.getKey(), eachGroup.getValue());
+ }
+ }
+
+ if (!timers.isEmpty()) {
+ Map<String, Map<String, Timer>> timerGroups = findMetricsItemGroup(timers);
+
+ for(Map.Entry<String, Map<String, Timer>> eachGroup: timerGroups.entrySet()) {
+ printTimerGroup(dateTime, eachGroup.getKey(), eachGroup.getValue());
+ }
+ }
+ try {
+ output.flush();
+ } catch (IOException e) {
+ }
+ }
+
+ private void printMeterGroup(String dateTime, String groupName, Map<String, Meter> meters) {
+ try {
+ output.write(meterGroupToString(dateTime, hostAndPort, rateFactor, groupName, meters).getBytes());
+ output.write(NEW_LINE);
+ } catch (Exception e) {
+ LOG.warn(e.getMessage(), e);
+ }
+ }
+
+ private void printCounterGroup(String dateTime, String groupName, Map<String, Counter> counters) {
+ try {
+ output.write(counterGroupToString(dateTime, hostAndPort, rateFactor, groupName, counters).getBytes());
+ output.write(NEW_LINE);
+ } catch (Exception e) {
+ LOG.warn(e.getMessage(), e);
+ }
+ }
+
+ private void printGaugeGroup(String dateTime, String groupName, Map<String, Gauge> gauges) {
+ try {
+ output.write(gaugeGroupToString(dateTime, hostAndPort, rateFactor, groupName, gauges).getBytes());
+ output.write(NEW_LINE);
+ } catch (Exception e) {
+ LOG.warn(e.getMessage(), e);
+ }
+ }
+
+ private void printHistogramGroup(String dateTime, String groupName, Map<String, Histogram> histograms) {
+ try {
+ output.write(histogramGroupToString(dateTime, hostAndPort, rateFactor, groupName, histograms).getBytes());
+ output.write(NEW_LINE);
+ } catch (Exception e) {
+ LOG.warn(e.getMessage(), e);
+ }
+ }
+
+ private void printTimerGroup(String dateTime, String groupName, Map<String, Timer> timers) {
+ try {
+ output.write(timerGroupToString(dateTime, hostAndPort, rateFactor, groupName, timers).getBytes());
+ output.write(NEW_LINE);
+ } catch (Exception e) {
+ LOG.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if(output != null) {
+ try {
+ output.close();
+ } catch (IOException e) {
+ }
+ }
+
+ super.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/NullReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/NullReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/NullReporter.java
new file mode 100644
index 0000000..9dc1755
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/NullReporter.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.*;
+
+import java.util.SortedMap;
+
+public class NullReporter extends TajoMetricsReporter {
+ @Override
+ public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters,
+ SortedMap<String, Histogram> histograms, SortedMap<String, Meter> meters,
+ SortedMap<String, Timer> timers) {
+ }
+}