You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/22 06:13:25 UTC

[35/47] incubator-kylin git commit: KYLIN-875 rename modules: core-common, core-cube, core-dictionary, core-cube

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
new file mode 100644
index 0000000..c9f1e08
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+
+public class BytesUtil {
+
+    public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+
+    public static void writeLong(long num, byte[] bytes, int offset, int size) {
+        for (int i = offset + size - 1; i >= offset; i--) {
+            bytes[i] = (byte) num;
+            num >>>= 8;
+        }
+    }
+
+    public static void writeUnsigned(int num, byte[] bytes, int offset, int size) {
+        for (int i = offset + size - 1; i >= offset; i--) {
+            bytes[i] = (byte) num;
+            num >>>= 8;
+        }
+    }
+
+    public static long readLong(byte[] bytes, int offset, int size) {
+        long integer = 0;
+        for (int i = offset, n = offset + size; i < n; i++) {
+            integer <<= 8;
+            integer |= (long) bytes[i] & 0xFF;
+        }
+        return integer;
+    }
+
+    public static int readUnsigned(byte[] bytes, int offset, int size) {
+        int integer = 0;
+        for (int i = offset, n = offset + size; i < n; i++) {
+            integer <<= 8;
+            integer |= (int) bytes[i] & 0xFF;
+        }
+        return integer;
+    }
+
+    public static void writeSigned(int num, byte[] bytes, int offset, int size) {
+        writeUnsigned(num, bytes, offset, size);
+    }
+
+    public static int readSigned(byte[] bytes, int offset, int size) {
+        int integer = (bytes[offset] & 0x80) == 0 ? 0 : -1;
+        for (int i = offset, n = offset + size; i < n; i++) {
+            integer <<= 8;
+            integer |= (int) bytes[i] & 0xFF;
+        }
+        return integer;
+    }
+
+    /**
+     * No. bytes needed to store a value as big as the given
+     */
+    public static int sizeForValue(int maxValue) {
+        int size = 0;
+        while (maxValue > 0) {
+            size++;
+            maxValue >>>= 8;
+        }
+        return size;
+    }
+
+    public static int compareByteUnsigned(byte b1, byte b2) {
+        int i1 = (int) b1 & 0xFF;
+        int i2 = (int) b2 & 0xFF;
+        return i1 - i2;
+    }
+
+    public static byte[] subarray(byte[] bytes, int start, int end) {
+        byte[] r = new byte[end - start];
+        System.arraycopy(bytes, start, r, 0, r.length);
+        return r;
+    }
+
+    public static int compareBytes(byte[] src, int srcOffset, byte[] dst, int dstOffset, int length) {
+        int r = 0;
+        for (int i = 0; i < length; i++) {
+            r = src[srcOffset + i] - dst[dstOffset + i];
+            if (r != 0)
+                break;
+        }
+        return r;
+    }
+
+    // from WritableUtils
+    // ============================================================================
+
+    public static void writeVInt(int i, ByteBuffer out) {
+        writeVLong(i, out);
+    }
+
+    public static void writeVLong(long i, ByteBuffer out) {
+        if (i >= -112 && i <= 127) {
+            out.put((byte) i);
+            return;
+        }
+
+        int len = -112;
+        if (i < 0) {
+            i ^= -1L; // take one's complement'
+            len = -120;
+        }
+
+        long tmp = i;
+        while (tmp != 0) {
+            tmp = tmp >> 8;
+            len--;
+        }
+
+        out.put((byte) len);
+
+        len = (len < -120) ? -(len + 120) : -(len + 112);
+
+        for (int idx = len; idx != 0; idx--) {
+            int shiftbits = (idx - 1) * 8;
+            long mask = 0xFFL << shiftbits;
+            out.put((byte) ((i & mask) >> shiftbits));
+        }
+    }
+
+    public static long readVLong(ByteBuffer in) {
+        byte firstByte = in.get();
+        int len = decodeVIntSize(firstByte);
+        if (len == 1) {
+            return firstByte;
+        }
+        long i = 0;
+        for (int idx = 0; idx < len - 1; idx++) {
+            byte b = in.get();
+            i = i << 8;
+            i = i | (b & 0xFF);
+        }
+        return (isNegativeVInt(firstByte) ? (i ^ -1L) : i);
+    }
+
+    public static int readVInt(ByteBuffer in) {
+        long n = readVLong(in);
+        if ((n > Integer.MAX_VALUE) || (n < Integer.MIN_VALUE)) {
+            throw new IllegalArgumentException("value too long to fit in integer");
+        }
+        return (int) n;
+    }
+
+    private static boolean isNegativeVInt(byte value) {
+        return value < -120 || (value >= -112 && value < 0);
+    }
+
+    private static int decodeVIntSize(byte value) {
+        if (value >= -112) {
+            return 1;
+        } else if (value < -120) {
+            return -119 - value;
+        }
+        return -111 - value;
+    }
+
+    public static void writeUnsigned(int num, int size, ByteBuffer out) {
+        int mask = 0xff << ((size - 1) * 8);
+        for (int i = size; i > 0; i--) {
+            int v = (num & mask) >> (i - 1) * 8;
+            out.put((byte) v);
+            mask = mask >> 8;
+        }
+    }
+
+    public static int readUnsigned(ByteBuffer in, int size) {
+        int integer = 0;
+        for (int i = 0; i < size; i++) {
+            integer = integer << 8;
+            integer |= (in.get() & 0xff);
+        }
+
+        return integer;
+    }
+
+    public static void writeLong(long num, ByteBuffer out) {
+        for (int i = 0; i < 8; i++) {
+            out.put((byte) num);
+            num >>>= 8;
+        }
+    }
+
+    public static long readLong(ByteBuffer in) {
+        long integer = 0;
+        int mask = 0xff;
+        int shift = 0;
+        for (int i = 0; i < 8; i++) {
+            integer |= (in.get() << shift) & mask;
+            mask = mask << 8;
+            shift += 8;
+        }
+        return integer;
+    }
+
+    public static void writeUTFString(String str, ByteBuffer out) {
+        byte[] bytes = str == null ? null : Bytes.toBytes(str);
+        writeByteArray(bytes, out);
+    }
+
+    public static String readUTFString(ByteBuffer in) {
+        byte[] bytes = readByteArray(in);
+        return bytes == null ? null : Bytes.toString(bytes);
+    }
+
+    public static void writeAsciiString(String str, ByteBuffer out) {
+        if (str == null) {
+            BytesUtil.writeVInt(-1, out);
+            return;
+        }
+        int len = str.length();
+        BytesUtil.writeVInt(len, out);
+        for (int i = 0; i < len; i++) {
+            out.put((byte) str.charAt(i));
+        }
+    }
+
+    public static String readAsciiString(ByteBuffer in) {
+        int len = BytesUtil.readVInt(in);
+        if (len < 0) {
+            return null;
+        }
+        String result;
+        try {
+            if (in.hasArray()) {
+                int pos = in.position();
+                result = new String(in.array(), pos, len, "ISO-8859-1");
+                in.position(pos + len);
+            } else {
+                byte[] tmp = new byte[len];
+                in.get(tmp);
+                result = new String(tmp, "ISO-8859-1");
+            }
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e); // never happen
+        }
+        return result;
+    }
+
+    public static void writeAsciiStringArray(String[] strs, ByteBuffer out) {
+        writeVInt(strs.length, out);
+        for (int i = 0; i < strs.length; i++)
+            writeAsciiString(strs[i], out);
+    }
+
+    public static String[] readAsciiStringArray(ByteBuffer in) {
+        int len = readVInt(in);
+        String[] strs = new String[len];
+        for (int i = 0; i < len; i++)
+            strs[i] = readAsciiString(in);
+        return strs;
+    }
+
+    public static void writeIntArray(int[] array, ByteBuffer out) {
+        if (array == null) {
+            writeVInt(-1, out);
+            return;
+        }
+        writeVInt(array.length, out);
+        for (int i = 0; i < array.length; ++i) {
+            writeVInt(array[i], out);
+        }
+    }
+
+    public static int[] readIntArray(ByteBuffer in) {
+        int len = readVInt(in);
+        if (len < 0)
+            return null;
+        int[] array = new int[len];
+
+        for (int i = 0; i < len; ++i) {
+            array[i] = readVInt(in);
+        }
+        return array;
+    }
+
+    public static void writeByteArray(byte[] array, ByteBuffer out) {
+        if (array == null) {
+            writeVInt(-1, out);
+            return;
+        }
+        writeVInt(array.length, out);
+        out.put(array);
+    }
+
+    public static void writeByteArray(byte[] array, int offset, int length, ByteBuffer out) {
+        if (array == null) {
+            writeVInt(-1, out);
+            return;
+        }
+        writeVInt(array.length, out);
+        out.put(array, offset, length);
+    }
+
+    public static byte[] readByteArray(ByteBuffer in) {
+        int len = readVInt(in);
+        if (len < 0)
+            return null;
+
+        byte[] array = new byte[len];
+        in.get(array);
+        return array;
+    }
+
+    public static int peekByteArrayLength(ByteBuffer in) {
+        int start = in.position();
+        int arrayLen = readVInt(in);
+        int sizeLen = in.position() - start;
+        in.position(start);
+
+        if (arrayLen < 0)
+            return sizeLen;
+        else
+            return sizeLen + arrayLen;
+    }
+
+    public static void writeBooleanArray(boolean[] array, ByteBuffer out) {
+        if (array == null) {
+            writeVInt(-1, out);
+            return;
+        }
+        writeVInt(array.length, out);
+        byte b_true = (byte) 1;
+        byte b_false = (byte) 0;
+        for (int i = 0; i < array.length; i++) {
+            if (array[i])
+                out.put(b_true);
+            else
+                out.put(b_false);
+        }
+    }
+
+    public static boolean[] readBooleanArray(ByteBuffer in) {
+        int len = readVInt(in);
+        if (len < 0)
+            return null;
+
+        boolean[] array = new boolean[len];
+        byte b_true = (byte) 1;
+        for (int i = 0; i < array.length; i++) {
+            byte temp = in.get();
+            if (temp == b_true)
+                array[i] = true;
+            else
+                array[i] = false;
+        }
+        return array;
+    }
+
+    public static byte[] toBytes(Writable writable) {
+        try {
+            ByteArrayOutputStream bout = new ByteArrayOutputStream();
+            DataOutputStream out = new DataOutputStream(bout);
+            writable.write(out);
+            out.close();
+            bout.close();
+            return bout.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static String toReadableText(byte[] array) {
+        if (array == null)
+            return null;
+        return toHex(array);
+    }
+
+
+    /**
+     * this method only works for hex strings
+     */
+    public static byte[] fromReadableText(String text) {
+        String[] tokens = text.split("\\\\x");
+        byte[] ret = new byte[tokens.length - 1];
+        for (int i = 1; i < tokens.length; ++i) {
+            int x = Bytes.toBinaryFromHex((byte) tokens[i].charAt(0));
+            x = x << 4;
+            int y = Bytes.toBinaryFromHex((byte) tokens[i].charAt(1));
+            ret[i - 1] = (byte) (x + y);
+        }
+        return ret;
+    }
+
+    public static String toHex(byte[] array) {
+        return toHex(new ImmutableBytesWritable(array));
+    }
+
+    public static String toHex(ImmutableBytesWritable bytes) {
+        byte[] array = bytes.get();
+        int offset = bytes.getOffset();
+        int length = bytes.getLength();
+        StringBuilder sb = new StringBuilder(length * 4);
+        for (int i = 0; i < length; i++) {
+            int b = array[offset + i];
+            sb.append(String.format("\\x%02X", b & 0xFF));
+        }
+        return sb.toString();
+    }
+
+
+    public static void main(String[] args) throws Exception {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/util/CaseInsensitiveStringMap.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/CaseInsensitiveStringMap.java b/core-common/src/main/java/org/apache/kylin/common/util/CaseInsensitiveStringMap.java
new file mode 100644
index 0000000..d6de581
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/CaseInsensitiveStringMap.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ */
+public class CaseInsensitiveStringMap<T> implements Map<String, T> {
+
+    private Map<String, T> innerMap;
+
+    public CaseInsensitiveStringMap() {
+        this(new HashMap<String, T>());
+    }
+
+    public CaseInsensitiveStringMap(Map<String, T> innerMap) {
+        this.innerMap = innerMap;
+    }
+
+    @Override
+    public int size() {
+        return innerMap.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return innerMap.isEmpty();
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+        return key != null ? innerMap.containsKey(key.toString().toUpperCase()) : false;
+    }
+
+    @Override
+    public boolean containsValue(Object value) {
+        return value != null ? innerMap.containsValue(value) : false;
+    }
+
+    @Override
+    public T get(Object key) {
+        return key != null ? innerMap.get(key.toString().toUpperCase()) : null;
+    }
+
+    @Override
+    public T put(String key, T value) {
+        return key != null ? innerMap.put(key.toString().toUpperCase(), value) : null;
+    }
+
+    @Override
+    public T remove(Object key) {
+        return key != null ? innerMap.remove(key.toString().toUpperCase()) : null;
+    }
+
+    @Override
+    public void putAll(Map<? extends String, ? extends T> m) {
+        innerMap.putAll(m);
+    }
+
+    @Override
+    public void clear() {
+        innerMap.clear();
+    }
+
+    @Override
+    public Set<String> keySet() {
+        return innerMap.keySet();
+    }
+
+    @Override
+    public Collection<T> values() {
+        return innerMap.values();
+    }
+
+    @Override
+    public Set<Entry<String, T>> entrySet() {
+        return innerMap.entrySet();
+    }
+
+    @Override
+    public String toString() {
+        return this.innerMap.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java
new file mode 100644
index 0000000..38167a8
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+/**
+ */
+public class ClassUtil {
+
+    public static void addClasspath(String path) throws Exception {
+        System.out.println("Adding path " + path + " to class path");
+        File file = new File(path);
+
+        if (file.exists()) {
+            URLClassLoader urlClassLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
+            Class<URLClassLoader> urlClass = URLClassLoader.class;
+            Method method = urlClass.getDeclaredMethod("addURL", new Class[] { URL.class });
+            method.setAccessible(true);
+            method.invoke(urlClassLoader, new Object[] { file.toURI().toURL() });
+        }
+    }
+
+    private static final WeakHashMap<String, Class<?>> forNameCache = new WeakHashMap<>();
+    private static final Map<String, String> classRenameMap;
+    static {
+        classRenameMap = new HashMap<>();
+        classRenameMap.put("org.apache.kylin.job.cube.CubingJob", "org.apache.kylin.engine.mr.CubingJob");
+        classRenameMap.put("org.apache.kylin.job.cube.GarbageCollectionStep", "org.apache.kylin.engine.mr.GarbageCollectionStep");
+        classRenameMap.put("org.apache.kylin.job.cube.MergeDictionaryStep", "org.apache.kylin.engine.mr.MergeDictionaryStep");
+        classRenameMap.put("org.apache.kylin.job.cube.UpdateCubeInfoAfterBuildStep", "org.apache.kylin.engine.mr.UpdateCubeInfoAfterBuildStep");
+        classRenameMap.put("org.apache.kylin.job.cube.UpdateCubeInfoAfterMergeStep", "org.apache.kylin.engine.mr.UpdateCubeInfoAfterMergeStep");
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> Class<? extends T> forName(String name, Class<T> clz) throws ClassNotFoundException {
+        String origName = name;
+        
+        Class<? extends T> result = (Class<? extends T>) forNameCache.get(origName);
+        if (result == null) {
+            name = forRenamedClass(name);
+            result = (Class<? extends T>) Class.forName(name);
+            forNameCache.put(origName, result);
+        }
+        return result;
+    }
+
+    private static String forRenamedClass(String name) {
+        if (name.startsWith("com.kylinolap")) {
+            name = "org.apache.kylin" + name.substring("com.kylinolap".length());
+        }
+        String rename = classRenameMap.get(name);
+        return rename == null ? name : rename;
+    }
+
+    public static Object newInstance(String clz) {
+        try {
+            return forName(clz, Object.class).newInstance();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
new file mode 100644
index 0000000..c8a84a3
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.util.Pair;
+
+/**
+ * @author yangli9
+ */
+public class CliCommandExecutor {
+
+    private String remoteHost;
+    private int port;
+    private String remoteUser;
+    private String remotePwd;
+    private int remoteTimeoutSeconds = 3600;
+
+    public CliCommandExecutor() {
+    }
+
+    public void setRunAtRemote(String host, int port, String user, String pwd) {
+        this.remoteHost = host;
+        this.port = port;
+        this.remoteUser = user;
+        this.remotePwd = pwd;
+    }
+
+    public void setRunAtLocal() {
+        this.remoteHost = null;
+        this.remoteUser = null;
+        this.remotePwd = null;
+    }
+
+    public void copyFile(String localFile, String destDir) throws IOException {
+        if (remoteHost == null)
+            copyNative(localFile, destDir);
+        else
+            copyRemote(localFile, destDir);
+    }
+
+    private void copyNative(String localFile, String destDir) throws IOException {
+        File src = new File(localFile);
+        File dest = new File(destDir, src.getName());
+        FileUtils.copyFile(src, dest);
+    }
+
+    private void copyRemote(String localFile, String destDir) throws IOException {
+        SSHClient ssh = new SSHClient(remoteHost, port, remoteUser, remotePwd);
+        try {
+            ssh.scpFileToRemote(localFile, destDir);
+        } catch (IOException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new IOException(e.getMessage(), e);
+        }
+    }
+
+    public Pair<Integer, String> execute(String command) throws IOException {
+        return execute(command, new SoutLogger());
+    }
+
+    public Pair<Integer, String> execute(String command, Logger logAppender) throws IOException {
+        Pair<Integer, String> r;
+        if (remoteHost == null) {
+            r = runNativeCommand(command, logAppender);
+        } else {
+            r = runRemoteCommand(command, logAppender);
+        }
+
+        if (r.getFirst() != 0)
+            throw new IOException("OS command error exit with " + r.getFirst() //
+                    + (remoteHost == null ? "" : " (remoteHost:" + remoteHost + ")") //
+                    + " -- " + command + "\n" + r.getSecond());
+
+        return r;
+    }
+
+    private Pair<Integer, String> runRemoteCommand(String command, Logger logAppender) throws IOException {
+        SSHClient ssh = new SSHClient(remoteHost, port, remoteUser, remotePwd);
+
+        SSHClientOutput sshOutput;
+        try {
+            sshOutput = ssh.execCommand(command, remoteTimeoutSeconds, logAppender);
+            int exitCode = sshOutput.getExitCode();
+            String output = sshOutput.getText();
+            return new Pair<Integer, String>(exitCode, output);
+        } catch (IOException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new IOException(e.getMessage(), e);
+        }
+    }
+
+    private Pair<Integer, String> runNativeCommand(String command, Logger logAppender) throws IOException {
+        String[] cmd = new String[3];
+        String osName = System.getProperty("os.name");
+        if (osName.startsWith("Windows")) {
+            cmd[0] = "cmd.exe";
+            cmd[1] = "/C";
+        } else {
+            cmd[0] = "/bin/bash";
+            cmd[1] = "-c";
+        }
+        cmd[2] = command;
+
+        ProcessBuilder builder = new ProcessBuilder(cmd);
+        builder.redirectErrorStream(true);
+        Process proc = builder.start();
+
+        BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+        String line;
+        StringBuilder result = new StringBuilder();
+        while ((line = reader.readLine()) != null) {
+            result.append(line).append('\n');
+            if (logAppender != null) {
+                logAppender.log(line);
+            }
+        }
+
+        try {
+            int exitCode = proc.waitFor();
+            return new Pair<Integer, String>(exitCode, result.toString());
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java b/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
new file mode 100644
index 0000000..6dda7ec
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
@@ -0,0 +1,54 @@
+package org.apache.kylin.common.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class CompressionUtils {
+    private static final org.slf4j.Logger logger = LoggerFactory.getLogger(CompressionUtils.class);
+
+
+    public static byte[] compress(byte[] data) throws IOException {
+        Deflater deflater = new Deflater();
+        deflater.setInput(data);
+
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream(data.length);
+
+        deflater.finish();
+        byte[] buffer = new byte[1024];
+        while (!deflater.finished()) {
+            int count = deflater.deflate(buffer); // returns the generated code... index
+            outputStream.write(buffer, 0, count);
+        }
+        outputStream.close();
+        byte[] output = outputStream.toByteArray();
+
+        logger.info("Original: " + data.length / 1024 + " Kb");
+        logger.info("Compressed: " + output.length / 1024 + " Kb");
+        return output;
+    }
+
+    public static byte[] decompress(byte[] data) throws IOException, DataFormatException {
+        Inflater inflater = new Inflater();
+        inflater.setInput(data);
+
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream(data.length);
+        byte[] buffer = new byte[1024];
+        while (!inflater.finished()) {
+            int count = inflater.inflate(buffer);
+            outputStream.write(buffer, 0, count);
+        }
+        outputStream.close();
+        byte[] output = outputStream.toByteArray();
+
+        logger.info("Original: " + data.length);
+        logger.info("Decompressed: " + output.length);
+        return output;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java b/core-common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java
new file mode 100644
index 0000000..bc4502c
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java
@@ -0,0 +1,15 @@
+package org.apache.kylin.common.util;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ */
+public class DaemonThreadFactory implements ThreadFactory {
+    @Override
+    public Thread newThread(Runnable r) {
+        Thread t = Executors.defaultThreadFactory().newThread(r);
+        t.setDaemon(true);
+        return t;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java b/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
new file mode 100644
index 0000000..f74debd
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
@@ -0,0 +1,88 @@
+package org.apache.kylin.common.util;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DateFormat {
+
+    public static final String DEFAULT_DATE_PATTERN = "yyyy-MM-dd";
+    public static final String DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS = "yyyy-MM-dd HH:mm:ss";
+    public static final String DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS = "yyyy-MM-dd HH:mm:ss.SSS";
+
+    static final private Map<String, ThreadLocal<SimpleDateFormat>> threadLocalMap = new ConcurrentHashMap<String, ThreadLocal<SimpleDateFormat>>();
+
+    public static SimpleDateFormat getDateFormat(String datePattern) {
+        ThreadLocal<SimpleDateFormat> formatThreadLocal = threadLocalMap.get(datePattern);
+        if (formatThreadLocal == null) {
+            threadLocalMap.put(datePattern, formatThreadLocal = new ThreadLocal<SimpleDateFormat>());
+        }
+        SimpleDateFormat format = formatThreadLocal.get();
+        if (format == null) {
+            format = new SimpleDateFormat(datePattern);
+            format.setTimeZone(TimeZone.getTimeZone("GMT")); // NOTE: this must be GMT to calculate epoch date correctly
+            formatThreadLocal.set(format);
+        }
+        return format;
+    }
+
+    public static String formatToDateStr(long millis) {
+        return formatToTimeStr(millis, DEFAULT_DATE_PATTERN);
+    }
+
+    public static String formatToTimeStr(long millis) {
+        return formatToTimeStr(millis, DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS);
+    }
+
+    public static String formatToTimeWithoutMilliStr(long millis) {
+        return formatToTimeStr(millis, DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
+    }
+
+    public static String formatToTimeStr(long millis, String pattern) {
+        return getDateFormat(pattern).format(new Date(millis));
+    }
+
+    public static String dateToString(Date date, String pattern) {
+        return getDateFormat(pattern).format(date);
+    }
+
+    public static Date stringToDate(String str) {
+        return stringToDate(str, DEFAULT_DATE_PATTERN);
+    }
+
+    public static Date stringToDate(String str, String pattern) {
+        Date date = null;
+        try {
+            date = getDateFormat(pattern).parse(str);
+        } catch (ParseException e) {
+            throw new IllegalArgumentException("'" + str + "' is not a valid date of pattern '" + pattern + "'", e);
+        }
+        return date;
+    }
+
+    public static long stringToMillis(String str) {
+        if (isAllDigits(str)) {
+            return Long.parseLong(str);
+        } else if (str.length() == 10) {
+            return stringToDate(str, DEFAULT_DATE_PATTERN).getTime();
+        } else if (str.length() == 19) {
+            return stringToDate(str, DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS).getTime();
+        } else if (str.length() > 19) {
+            return stringToDate(str, DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS).getTime();
+        } else {
+            throw new IllegalArgumentException("there is no valid date pattern for:" + str);
+        }
+    }
+
+    private static boolean isAllDigits(String str) {
+        for (int i = 0, n = str.length(); i < n; i++) {
+            if (Character.isDigit(str.charAt(i)) == false)
+                return false;
+        }
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java b/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java
new file mode 100644
index 0000000..4c4bc6b
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java
@@ -0,0 +1,19 @@
+package org.apache.kylin.common.util;
+
+import java.util.Iterator;
+import java.util.Queue;
+
+/**
+ */
+public class FIFOIterable<T> implements Iterable<T> {
+    private Queue<T> q;
+
+    public FIFOIterable(Queue<T> q) {
+        this.q = q;
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+        return new FIFOIterator<T>(q);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java b/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java
new file mode 100644
index 0000000..f734143
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java
@@ -0,0 +1,33 @@
+package org.apache.kylin.common.util;
+
+import java.util.Iterator;
+import java.util.Queue;
+
+/**
+ *
+ * Normal iterators in Collections are fail-safe,
+ * i.e. adding elements to a queue will break current iterator.
+ * The FIFOIterator is stateless, it only check the first element of a Queue
+ */
+public class FIFOIterator<T> implements Iterator<T> {
+    private Queue<T> q;
+
+    public FIFOIterator(Queue<T> q) {
+        this.q = q;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return !q.isEmpty();
+    }
+
+    @Override
+    public T next() {
+        return q.poll();
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java b/core-common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
new file mode 100644
index 0000000..12b0c40
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+/** This class will come with HBase 2.0 in package org.apache.hadoop.hbase.util **/
+package org.apache.kylin.common.util;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.kylin.common.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HBaseRegionSizeCalculator {
+
+    private static final Logger logger = LoggerFactory.getLogger(HBaseRegionSizeCalculator.class);
+
+    /**
+     * Maps each region to its size in bytes.
+     **/
+    private final Map<byte[], Long> sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+
+    static final String ENABLE_REGIONSIZECALCULATOR = "hbase.regionsizecalculator.enable";
+
+    /**
+     * Computes size of each region for table and given column families.
+     * */
+    public HBaseRegionSizeCalculator(HTable table) throws IOException {
+        this(table, new HBaseAdmin(table.getConfiguration()));
+    }
+
+    /** Constructor for unit testing */
+    HBaseRegionSizeCalculator(HTable table, HBaseAdmin hBaseAdmin) throws IOException {
+
+        try {
+            if (!enabled(table.getConfiguration())) {
+                logger.info("Region size calculation disabled.");
+                return;
+            }
+
+            logger.info("Calculating region sizes for table \"" + new String(table.getTableName()) + "\".");
+
+            // Get regions for table.
+            Set<HRegionInfo> tableRegionInfos = table.getRegionLocations().keySet();
+            Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+
+            for (HRegionInfo regionInfo : tableRegionInfos) {
+                tableRegions.add(regionInfo.getRegionName());
+            }
+
+            ClusterStatus clusterStatus = hBaseAdmin.getClusterStatus();
+            Collection<ServerName> servers = clusterStatus.getServers();
+            final long megaByte = 1024L * 1024L;
+
+            // Iterate all cluster regions, filter regions from our table and
+            // compute their size.
+            for (ServerName serverName : servers) {
+                ServerLoad serverLoad = clusterStatus.getLoad(serverName);
+
+                for (RegionLoad regionLoad : serverLoad.getRegionsLoad().values()) {
+                    byte[] regionId = regionLoad.getName();
+
+                    if (tableRegions.contains(regionId)) {
+
+                        long regionSizeBytes = regionLoad.getStorefileSizeMB() * megaByte;
+                        sizeMap.put(regionId, regionSizeBytes);
+
+                        // logger.info("Region " + regionLoad.getNameAsString()
+                        // + " has size " + regionSizeBytes);
+                    }
+                }
+            }
+        } finally {
+            hBaseAdmin.close();
+        }
+
+    }
+
+    boolean enabled(Configuration configuration) {
+        return configuration.getBoolean(ENABLE_REGIONSIZECALCULATOR, true);
+    }
+
+    /**
+     * Returns size of given region in bytes. Returns 0 if region was not found.
+     **/
+    public long getRegionSize(byte[] regionId) {
+        Long size = sizeMap.get(regionId);
+        if (size == null) {
+            logger.info("Unknown region:" + Arrays.toString(regionId));
+            return 0;
+        } else {
+            return size;
+        }
+    }
+
+    public Map<byte[], Long> getRegionSizeMap() {
+        return Collections.unmodifiableMap(sizeMap);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
new file mode 100644
index 0000000..fb6b9bb
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HadoopUtil {
+    private static final Logger logger = LoggerFactory.getLogger(HadoopUtil.class);
+
+    private static ThreadLocal<Configuration> hadoopConfig = new ThreadLocal<>();
+
+    public static void setCurrentConfiguration(Configuration conf) {
+        hadoopConfig.set(conf);
+    }
+
+    public static Configuration getCurrentConfiguration() {
+        if (hadoopConfig.get() == null) {
+            hadoopConfig.set(new Configuration());
+        }
+        return hadoopConfig.get();
+    }
+
+    public static FileSystem getFileSystem(String path) throws IOException {
+        return FileSystem.get(makeURI(path), getCurrentConfiguration());
+    }
+
+    public static URI makeURI(String filePath) {
+        try {
+            return new URI(fixWindowsPath(filePath));
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Cannot create FileSystem from URI: " + filePath, e);
+        }
+    }
+
+    public static String fixWindowsPath(String path) {
+        // fix windows path
+        if (path.startsWith("file://") && !path.startsWith("file:///") && path.contains(":\\")) {
+            path = path.replace("file://", "file:///");
+        }
+        if (path.startsWith("file:///")) {
+            path = path.replace('\\', '/');
+        }
+        return path;
+    }
+
+    public static Configuration newHadoopJobConfiguration() {
+        Configuration conf = new Configuration();
+        conf.set(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, "8");
+        return conf;
+    }
+
+    /**
+     * e.g.
+     * 0. hbase (recommended way)
+     * 1. hbase:zk-1.hortonworks.com,zk-2.hortonworks.com,zk-3.hortonworks.com:2181:/hbase-unsecure
+     * 2. hbase:zk-1.hortonworks.com,zk-2.hortonworks.com,zk-3.hortonworks.com:2181
+     * 3. hbase:zk-1.hortonworks.com:2181:/hbase-unsecure
+     * 4. hbase:zk-1.hortonworks.com:2181
+     */
+    public static Configuration newHBaseConfiguration(String url) {
+        Configuration conf = HBaseConfiguration.create();
+        // reduce rpc retry
+        conf.set(HConstants.HBASE_CLIENT_PAUSE, "3000");
+        conf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "5");
+        conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000");
+        // conf.set(ScannerCallable.LOG_SCANNER_ACTIVITY, "true");
+        if (StringUtils.isEmpty(url)) {
+            return conf;
+        }
+
+        // chop off "hbase"
+        if (url.startsWith("hbase") == false) {
+            throw new IllegalArgumentException("hbase url must start with 'hbase' -- " + url);
+        }
+
+        url = StringUtils.substringAfter(url, "hbase");
+        if (StringUtils.isEmpty(url)) {
+            return conf;
+        }
+
+        // case of "hbase:domain.com:2181:/hbase-unsecure"
+        Pattern urlPattern = Pattern.compile("[:]((?:[\\w\\-.]+)(?:\\,[\\w\\-.]+)*)[:](\\d+)(?:[:](.+))");
+        Matcher m = urlPattern.matcher(url);
+        if (m.matches() == false)
+            throw new IllegalArgumentException("HBase URL '" + url + "' is invalid, expected url is like '" + "hbase:domain.com:2181:/hbase-unsecure" + "'");
+
+        logger.debug("Creating hbase conf by parsing -- " + url);
+
+        String quorums = m.group(1);
+        String quorum = null;
+        try {
+            String[] tokens = quorums.split(",");
+            for (String s : tokens) {
+                quorum = s;
+                InetAddress.getByName(quorum);
+            }
+        } catch (UnknownHostException e) {
+            throw new IllegalArgumentException("Zookeeper quorum is invalid: " + quorum + "; urlString=" + url, e);
+        }
+        conf.set(HConstants.ZOOKEEPER_QUORUM, quorums);
+
+        String port = m.group(2);
+        conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, port);
+
+        String znodePath = m.group(3);
+        conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodePath);
+
+        return conf;
+    }
+
+    /**
+     * @param table the identifier of hive table, in format <db_name>.<table_name>
+     * @return a string array with 2 elements: {"db_name", "table_name"}
+     */
+    public static String[] parseHiveTableName(String table) {
+        int cut = table.indexOf('.');
+        String database = cut >= 0 ? table.substring(0, cut).trim() : "DEFAULT";
+        String tableName = cut >= 0 ? table.substring(cut + 1).trim() : table.trim();
+
+        return new String[] { database, tableName };
+    }
+
+    public static void deletePath(Configuration conf, Path path) throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+        if (fs.exists(path)) {
+            fs.delete(path, true);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/util/HiveClient.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HiveClient.java b/core-common/src/main/java/org/apache/kylin/common/util/HiveClient.java
new file mode 100644
index 0000000..a5be14e
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HiveClient.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+/**
+ * Hive meta API client for Kylin
+ * @author shaoshi
+ *
+ */
+public class HiveClient {
+
+    protected HiveConf hiveConf = null;
+    protected Driver driver = null;
+    protected HiveMetaStoreClient metaStoreClient = null;
+
+    public HiveClient() {
+        hiveConf = new HiveConf(HiveClient.class);
+    }
+
+    public HiveClient(Map<String, String> configMap) {
+        this();
+        appendConfiguration(configMap);
+    }
+
+    public HiveConf getHiveConf() {
+        return hiveConf;
+    }
+
+    /**
+     * Get the hive ql driver to execute ddl or dml
+     * @return
+     */
+    private Driver getDriver() {
+        if (driver == null) {
+            driver = new Driver(hiveConf);
+            SessionState.start(new CliSessionState(hiveConf));
+        }
+
+        return driver;
+    }
+
+    /**
+     * Append or overwrite the default hive client configuration; You need call this before invoke #executeHQL;
+     * @param configMap
+     */
+    public void appendConfiguration(Map<String, String> configMap) {
+        if (configMap != null && configMap.size() > 0) {
+            for (Entry<String, String> e : configMap.entrySet()) {
+                hiveConf.set(e.getKey(), e.getValue());
+            }
+        }
+    }
+
+    /**
+     * 
+     * @param hql
+     * @throws CommandNeedRetryException
+     * @throws IOException
+     */
+    public void executeHQL(String hql) throws CommandNeedRetryException, IOException {
+        CommandProcessorResponse response = getDriver().run(hql);
+        int retCode = response.getResponseCode();
+        if (retCode != 0) {
+            String err = response.getErrorMessage();
+            throw new IOException("Failed to execute hql [" + hql + "], error message is: " + err);
+        }
+    }
+
+    public void executeHQL(String[] hqls) throws CommandNeedRetryException, IOException {
+        for (String sql : hqls)
+            executeHQL(sql);
+    }
+
+    private HiveMetaStoreClient getMetaStoreClient() throws Exception {
+        if (metaStoreClient == null) {
+            metaStoreClient = new HiveMetaStoreClient(hiveConf);
+        }
+        return metaStoreClient;
+    }
+
+    public Table getHiveTable(String database, String tableName) throws Exception {
+        return getMetaStoreClient().getTable(database, tableName);
+    }
+
+    public List<FieldSchema> getHiveTableFields(String database, String tableName) throws Exception {
+        return getMetaStoreClient().getFields(database, tableName);
+    }
+
+    public String getHiveTableLocation(String database, String tableName) throws Exception {
+        Table t = getHiveTable(database, tableName);
+        return t.getSd().getLocation();
+    }
+
+    public long getFileSizeForTable(Table table) {
+        return getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.TOTAL_SIZE);
+    }
+
+    public long getFileNumberForTable(Table table) {
+        return getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.NUM_FILES);
+    }
+
+    /**
+     * COPIED FROM org.apache.hadoop.hive.ql.stats.StatsUtil for backward compatibility
+     * 
+     * Get basic stats of table
+     * @param table
+     *          - table
+     * @param statType
+     *          - type of stats
+     * @return value of stats
+     */
+    public static long getBasicStatForTable(org.apache.hadoop.hive.ql.metadata.Table table, String statType) {
+        Map<String, String> params = table.getParameters();
+        long result = 0;
+
+        if (params != null) {
+            try {
+                result = Long.parseLong(params.get(statType));
+            } catch (NumberFormatException e) {
+                result = 0;
+            }
+        }
+        return result;
+    }
+
+    public boolean isNativeTable(String database, String tableName)  throws Exception{
+        return !MetaStoreUtils.isNonNativeTable(getMetaStoreClient().getTable(database, tableName));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/util/IdentityUtils.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/IdentityUtils.java b/core-common/src/main/java/org/apache/kylin/common/util/IdentityUtils.java
new file mode 100644
index 0000000..d873959
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/IdentityUtils.java
@@ -0,0 +1,34 @@
+package org.apache.kylin.common.util;
+
+import java.util.Collection;
+import java.util.IdentityHashMap;
+
+/**
+ */
+public class IdentityUtils {
+    public static <K> boolean collectionReferenceEquals(Collection<K> collectionA, Collection<K> collectionB) {
+        if (collectionA == null || collectionB == null) {
+            throw new RuntimeException("input must be not null");
+        }
+
+        IdentityHashMap<K, Void> mapA = new IdentityHashMap<>();
+        IdentityHashMap<K, Void> mapB = new IdentityHashMap<>();
+        for (K key : collectionA) {
+            mapA.put(key, null);
+        }
+        for (K key : collectionB) {
+            mapB.put(key, null);
+        }
+
+        if (mapA.keySet().size() != mapB.keySet().size()) {
+            return false;
+        }
+
+        for (K key : mapA.keySet()) {
+            if (!mapB.keySet().contains(key)) {
+                return false;
+            }
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java b/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
new file mode 100644
index 0000000..c401425
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
@@ -0,0 +1,131 @@
+package org.apache.kylin.common.util;
+
+import java.util.BitSet;
+
+public class ImmutableBitSet {
+
+    public static final ImmutableBitSet EMPTY = new ImmutableBitSet(new BitSet());
+
+    final private BitSet set;
+    final private int[] arr;
+
+    public ImmutableBitSet(int index) {
+        this(newBitSet(index));
+    }
+
+    private static BitSet newBitSet(int index) {
+        BitSet set = new BitSet(index);
+        set.set(index);
+        return set;
+    }
+
+    public ImmutableBitSet(int indexFrom, int indexTo) {
+        this(newBitSet(indexFrom, indexTo));
+    }
+    
+    private static BitSet newBitSet(int indexFrom, int indexTo) {
+        BitSet set = new BitSet(indexTo);
+        set.set(indexFrom, indexTo);
+        return set;
+    }
+
+    public ImmutableBitSet(BitSet set) {
+        this.set = (BitSet) set.clone();
+        this.arr = new int[set.cardinality()];
+
+        int j = 0;
+        for (int i = set.nextSetBit(0); i >= 0; i = set.nextSetBit(i + 1)) {
+            arr[j++] = i;
+        }
+    }
+
+    /** return number of true bits */
+    public int trueBitCount() {
+        return arr.length;
+    }
+
+    /** return the i-th true bit */
+    public int trueBitAt(int i) {
+        return arr[i];
+    }
+    
+    /** return the bit's index among true bits */
+    public int trueBitIndexOf(int bitIndex) {
+        for (int i = 0; i < arr.length; i++) {
+            if (arr[i] == bitIndex)
+                return i;
+        }
+        return -1;
+    }
+
+    public BitSet mutable() {
+        return (BitSet) set.clone();
+    }
+
+    public ImmutableBitSet set(int bitIndex) {
+        return set(bitIndex, true);
+    }
+    
+    public ImmutableBitSet set(int bitIndex, boolean value) {
+        if (set.get(bitIndex) == value) {
+            return this;
+        } else {
+            BitSet mutable = mutable();
+            mutable.set(bitIndex, value);
+            return new ImmutableBitSet(mutable);
+        }
+    }
+    
+    public ImmutableBitSet or(ImmutableBitSet another) {
+        BitSet mutable = mutable();
+        mutable.or(another.set);
+        return new ImmutableBitSet(mutable);
+    }
+    
+    public ImmutableBitSet andNot(ImmutableBitSet another) {
+        BitSet mutable = mutable();
+        mutable.andNot(another.set);
+        return new ImmutableBitSet(mutable);
+    }
+
+    @Override
+    public int hashCode() {
+        return set.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+
+        ImmutableBitSet other = (ImmutableBitSet) obj;
+        return this.set.equals(other.set);
+    }
+    
+    @Override
+    public String toString() {
+        return set.toString();
+    }
+
+    // ============================================================================
+
+    public boolean get(int bitIndex) {
+        return set.get(bitIndex);
+    }
+
+    public int cardinality() {
+        return set.cardinality();
+    }
+
+    public boolean intersects(ImmutableBitSet another) {
+        return set.intersects(another.set);
+    }
+
+    public boolean isEmpty() {
+        return set.isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
new file mode 100644
index 0000000..5b3a22c
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import com.fasterxml.jackson.core.JsonGenerationException;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+
+import java.io.*;
+
+public class JsonUtil {
+
+    // reuse the object mapper to save memory footprint
+    private static final ObjectMapper mapper = new ObjectMapper();
+    private static final ObjectMapper indentMapper = new ObjectMapper();
+
+
+    static {
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        indentMapper.configure(SerializationFeature.INDENT_OUTPUT, true);
+    }
+
+    public static <T> T readValue(File src, Class<T> valueType) throws IOException, JsonParseException, JsonMappingException {
+        return mapper.readValue(src, valueType);
+    }
+
+    public static <T> T readValue(String content, Class<T> valueType) throws IOException, JsonParseException, JsonMappingException {
+        return mapper.readValue(content, valueType);
+    }
+
+    public static <T> T readValue(Reader src, Class<T> valueType) throws IOException, JsonParseException, JsonMappingException {
+        return mapper.readValue(src, valueType);
+    }
+
+    public static <T> T readValue(InputStream src, Class<T> valueType) throws IOException, JsonParseException, JsonMappingException {
+        return mapper.readValue(src, valueType);
+    }
+
+    public static <T> T readValue(byte[] src, Class<T> valueType) throws IOException, JsonParseException, JsonMappingException {
+        return mapper.readValue(src, valueType);
+    }
+
+    public static void writeValueIndent(OutputStream out, Object value) throws IOException, JsonGenerationException, JsonMappingException {
+        indentMapper.writeValue(out, value);
+    }
+
+    public static void writeValue(OutputStream out, Object value) throws IOException, JsonGenerationException, JsonMappingException {
+        mapper.writeValue(out, value);
+    }
+
+    public static String writeValueAsString(Object value) throws JsonProcessingException {
+        return mapper.writeValueAsString(value);
+    }
+
+    public static byte[] writeValueAsBytes(Object value) throws JsonProcessingException {
+        return mapper.writeValueAsBytes(value);
+    }
+
+    public static String writeValueAsIndentString(Object value) throws JsonProcessingException {
+        return indentMapper.writeValueAsString(value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/util/LogTitlePrinter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/LogTitlePrinter.java b/core-common/src/main/java/org/apache/kylin/common/util/LogTitlePrinter.java
new file mode 100644
index 0000000..f7d3a16
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/LogTitlePrinter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+/**
+ */
+public class LogTitlePrinter {
+    public static void printTitle(String title) {
+        String leftAlignFormat = "| %-100s | %n";
+
+        System.out.format("+------------------------------------------------------------------------------------------------------+%n");
+        System.out.format(leftAlignFormat, title);
+        System.out.format("+------------------------------------------------------------------------------------------------------+%n");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/util/Logger.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Logger.java b/core-common/src/main/java/org/apache/kylin/common/util/Logger.java
new file mode 100644
index 0000000..323da88
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/Logger.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+/**
+ * @author ysong1
+ * 
+ */
+public interface Logger {
+    public void log(String message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/util/MailService.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/MailService.java b/core-common/src/main/java/org/apache/kylin/common/util/MailService.java
new file mode 100644
index 0000000..7285520
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/MailService.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.mail.Email;
+import org.apache.commons.mail.EmailException;
+import org.apache.commons.mail.HtmlEmail;
+
+import org.apache.kylin.common.KylinConfig;
+
+/**
+ * @author xduo
+ */
+public class MailService {
+
+    private Boolean enabled = Boolean.TRUE;
+    private String host;
+    private String username;
+    private String password;
+    private String sender;
+
+    private static final Log logger = LogFactory.getLog(MailService.class);
+
+    public MailService() {
+        this(KylinConfig.getInstanceFromEnv());
+    }
+
+    public MailService(KylinConfig config) {
+        this("true".equalsIgnoreCase(config.getProperty(KylinConfig.MAIL_ENABLED, "false")),
+                config.getProperty(KylinConfig.MAIL_HOST, ""),
+                config.getProperty(KylinConfig.MAIL_USERNAME, ""),
+                config.getProperty(KylinConfig.MAIL_PASSWORD, ""),
+                config.getProperty(KylinConfig.MAIL_SENDER, "")
+                );
+    }
+
+    public MailService(boolean enabled, String host, String username, String password, String sender) {
+        this.enabled = enabled;
+        this.host = host;
+        this.username = username;
+        this.password = password;
+        this.sender = sender;
+
+        if (enabled) {
+            if (host.isEmpty()) {
+                throw new RuntimeException("mail service host is empty");
+            }
+        }
+    }
+
+    /**
+     * @param receivers
+     * @param subject
+     * @param content
+     * @return true or false indicating whether the email was delivered successfully
+     * @throws IOException
+     */
+    public boolean sendMail(List<String> receivers, String subject, String content) {
+        return sendMail(receivers, subject, content, true);
+    }
+
+    /**
+     * @param receivers
+     * @param subject
+     * @param content
+     * @return true or false indicating whether the email was delivered successfully
+     * @throws IOException
+     */
+    public boolean sendMail(List<String> receivers, String subject, String content, boolean isHtmlMsg) {
+
+        if (!enabled) {
+            logger.info("Email service is disabled; this mail will not be delivered: " + subject);
+            logger.info("To enable mail service, set 'mail.enabled=true' in kylin.properties");
+            return false;
+        }
+
+        Email email = new HtmlEmail();
+        email.setHostName(host);
+        if (username != null && username.trim().length() > 0) {
+            email.setAuthentication(username, password);
+        }
+
+        //email.setDebug(true);
+        try {
+            for (String receiver : receivers) {
+                email.addTo(receiver);
+            }
+
+            email.setFrom(sender);
+            email.setSubject(subject);
+            email.setCharset("UTF-8");
+            if (isHtmlMsg) {
+                ((HtmlEmail) email).setHtmlMsg(content);
+            } else {
+                ((HtmlEmail) email).setTextMsg(content);
+            }
+            email.send();
+            email.getMailSession();
+
+        } catch (EmailException e) {
+            logger.error(e.getLocalizedMessage(),e);
+            return false;
+        }
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
new file mode 100644
index 0000000..19302bf
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
@@ -0,0 +1,248 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class MemoryBudgetController {
+
+    private static final boolean debug = true;
+
+    public interface MemoryConsumer {
+        // return number MB released
+        int freeUp(int mb);
+    }
+
+    @SuppressWarnings("serial")
+    public static class NotEnoughBudgetException extends IllegalStateException {
+
+        public NotEnoughBudgetException() {
+            super();
+        }
+
+        public NotEnoughBudgetException(Throwable cause) {
+            super(cause);
+        }
+    }
+
+    private static class ConsumerEntry {
+        final MemoryConsumer consumer;
+        int reservedMB;
+
+        ConsumerEntry(MemoryConsumer consumer) {
+            this.consumer = consumer;
+        }
+    }
+
+    public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0);
+    public static final int ONE_MB = 1024 * 1024;
+
+    private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class);
+
+    // all budget numbers are in MB
+    private final int totalBudgetMB;
+    private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
+    private int totalReservedMB;
+    private final ReentrantLock lock = new ReentrantLock();
+
+    public MemoryBudgetController(int totalBudgetMB) {
+        Preconditions.checkArgument(totalBudgetMB >= 0);
+        Preconditions.checkState(totalBudgetMB <= getSystemAvailMB());
+        this.totalBudgetMB = totalBudgetMB;
+        this.totalReservedMB = 0;
+    }
+
+    public int getTotalBudgetMB() {
+        return totalBudgetMB;
+    }
+
+    public int getTotalReservedMB() {
+        lock.lock();
+        try {
+            return totalReservedMB;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int getRemainingBudgetMB() {
+        lock.lock();
+        try {
+            return totalBudgetMB - totalReservedMB;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void reserveInsist(MemoryConsumer consumer, int requestMB) {
+        long waitStart = 0;
+        while (true) {
+            try {
+                reserve(consumer, requestMB);
+                if (debug && waitStart > 0)
+                    logger.debug(consumer + " waited " + (System.currentTimeMillis() - waitStart) + " ms on the " + requestMB + " MB request");
+                return;
+            } catch (NotEnoughBudgetException ex) {
+                // retry
+            }
+
+            if (waitStart == 0)
+                waitStart = System.currentTimeMillis();
+
+            synchronized (lock) {
+                try {
+                    lock.wait();
+                } catch (InterruptedException e) {
+                    throw new NotEnoughBudgetException(e);
+                }
+            }
+        }
+    }
+
+    /** reserve without wait, fail with NotEnoughBudgetException immediately if no mem */
+    public void reserve(MemoryConsumer consumer, int requestMB) {
+        if (totalBudgetMB == 0 && requestMB > 0)
+            throw new NotEnoughBudgetException();
+
+        boolean ok = false;
+        while (!ok) {
+            int gap = calculateGap(consumer, requestMB);
+            if (gap > 0) {
+                // to void deadlock, don't hold lock when invoking consumer.freeUp()
+                tryFreeUp(gap);
+            }
+            ok = updateBooking(consumer, requestMB);
+        }
+    }
+
+    private int calculateGap(MemoryConsumer consumer, int requestMB) {
+        lock.lock();
+        try {
+            ConsumerEntry entry = booking.get(consumer);
+            int curMB = entry == null ? 0 : entry.reservedMB;
+            int delta = requestMB - curMB;
+            return delta - (totalBudgetMB - totalReservedMB);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void tryFreeUp(int gap) {
+        // note don't hold lock when calling consumer.freeUp(), that method holding lock for itself and may cause deadlock
+        for (ConsumerEntry entry : booking.values()) {
+            int mb = entry.consumer.freeUp(gap);
+            if (mb > 0) {
+                lock.lock();
+                try {
+                    updateBookingWithDelta(entry.consumer, -mb);
+                } finally {
+                    lock.unlock();
+                }
+                gap -= mb;
+                if (gap <= 0)
+                    break;
+            }
+        }
+        if (gap > 0)
+            throw new NotEnoughBudgetException();
+
+        if (debug) {
+            if (getSystemAvailMB() < getRemainingBudgetMB()) {
+                logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
+            }
+        }
+    }
+
+    private boolean updateBooking(MemoryConsumer consumer, int requestMB) {
+        lock.lock();
+        try {
+            ConsumerEntry entry = booking.get(consumer);
+            if (entry == null) {
+                if (requestMB == 0)
+                    return true;
+
+                entry = new ConsumerEntry(consumer);
+                booking.put(consumer, entry);
+            }
+
+            int delta = requestMB - entry.reservedMB;
+            return updateBookingWithDelta(consumer, delta);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    // lock MUST be obtained before entering
+    private boolean updateBookingWithDelta(MemoryConsumer consumer, int delta) {
+        if (delta == 0)
+            return true;
+
+        ConsumerEntry entry = booking.get(consumer);
+        if (entry == null) {
+            if (delta <= 0)
+                return true;
+
+            entry = new ConsumerEntry(consumer);
+            booking.put(consumer, entry);
+        }
+
+        // double check gap again, it may be changed by other concurrent requests
+        if (delta > 0) {
+            int gap = delta - (totalBudgetMB - totalReservedMB);
+            if (gap > 0)
+                return false;
+        }
+
+        totalReservedMB += delta;
+        entry.reservedMB += delta;
+        if (entry.reservedMB == 0) {
+            booking.remove(entry.consumer);
+        }
+        if (debug) {
+            logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
+        }
+
+        if (delta < 0) {
+            synchronized (lock) {
+                lock.notifyAll();
+            }
+        }
+
+        return true;
+    }
+
+    public static long getSystemAvailBytes() {
+        Runtime runtime = Runtime.getRuntime();
+        long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
+        long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
+        long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
+        long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
+        long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
+        return availableMemory;
+    }
+
+    public static int getSystemAvailMB() {
+        return (int) (getSystemAvailBytes() / ONE_MB);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/util/MyLogFormatter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/MyLogFormatter.java b/core-common/src/main/java/org/apache/kylin/common/util/MyLogFormatter.java
new file mode 100644
index 0000000..21cb41d
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/MyLogFormatter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.logging.Formatter;
+import java.util.logging.LogRecord;
+
+public class MyLogFormatter extends Formatter {
+
+    Date dat = new Date();
+
+    // Line separator string. This is the value of the line.separator
+    // property at the moment that the SimpleFormatter was created.
+    private String lineSeparator = "\n";
+
+    /**
+     * Format the given LogRecord.
+     * 
+     * @param record
+     *            the log record to be formatted.
+     * @return a formatted log record
+     */
+    public synchronized String format(LogRecord record) {
+        StringBuffer sb = new StringBuffer();
+        // Minimize memory allocations here.
+        Timestamp ts = new Timestamp(record.getMillis());
+        String text = ts.toString();
+        sb.append("JUL ");
+        sb.append(text);
+        sb.append(" ");
+        if (record.getSourceClassName() != null) {
+            sb.append(record.getSourceClassName());
+        } else {
+            sb.append(record.getLoggerName());
+        }
+        if (record.getSourceMethodName() != null) {
+            sb.append(" ");
+            sb.append(record.getSourceMethodName());
+        }
+        sb.append(lineSeparator);
+        String message = formatMessage(record);
+        sb.append(record.getLevel().getLocalizedName());
+        sb.append(": ");
+        sb.append(message);
+        sb.append(lineSeparator);
+        if (record.getThrown() != null) {
+            try {
+                StringWriter sw = new StringWriter();
+                PrintWriter pw = new PrintWriter(sw);
+                record.getThrown().printStackTrace(pw);
+                pw.close();
+                sb.append(sw.toString());
+            } catch (Exception ex) {
+            }
+        }
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/util/Pair.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Pair.java b/core-common/src/main/java/org/apache/kylin/common/util/Pair.java
new file mode 100644
index 0000000..bd24401
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/Pair.java
@@ -0,0 +1,132 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+import java.io.Serializable;
+
+
+/**
+ * A generic class for pairs. Copied from org.apache.hadoop.hbase.util.Pair
+ * @param <T1>
+ * @param <T2>
+ */
+public class Pair<T1, T2> implements Serializable
+{
+    private static final long serialVersionUID = -3986244606585552569L;
+    protected T1 first = null;
+    protected T2 second = null;
+
+    /**
+     * Default constructor.
+     */
+    public Pair()
+    {
+    }
+
+    /**
+     * Constructor
+     * @param a operand
+     * @param b operand
+     */
+    public Pair(T1 a, T2 b)
+    {
+        this.first = a;
+        this.second = b;
+    }
+
+    /**
+     * Constructs a new pair, inferring the type via the passed arguments
+     * @param <T1> type for first
+     * @param <T2> type for second
+     * @param a first element
+     * @param b second element
+     * @return a new pair containing the passed arguments
+     */
+    public static <T1,T2> Pair<T1,T2> newPair(T1 a, T2 b) {
+        return new Pair<T1,T2>(a, b);
+    }
+
+    /**
+     * Replace the first element of the pair.
+     * @param a operand
+     */
+    public void setFirst(T1 a)
+    {
+        this.first = a;
+    }
+
+    /**
+     * Replace the second element of the pair.
+     * @param b operand
+     */
+    public void setSecond(T2 b)
+    {
+        this.second = b;
+    }
+
+    /**
+     * Return the first element stored in the pair.
+     * @return T1
+     */
+    public T1 getFirst()
+    {
+        return first;
+    }
+
+    /**
+     * Return the second element stored in the pair.
+     * @return T2
+     */
+    public T2 getSecond()
+    {
+        return second;
+    }
+
+    private static boolean equals(Object x, Object y)
+    {
+        return (x == null && y == null) || (x != null && x.equals(y));
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public boolean equals(Object other)
+    {
+        return other instanceof Pair && equals(first, ((Pair)other).first) &&
+                equals(second, ((Pair)other).second);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        if (first == null)
+            return (second == null) ? 0 : second.hashCode() + 1;
+        else if (second == null)
+            return first.hashCode() + 2;
+        else
+            return first.hashCode() * 17 + second.hashCode();
+    }
+
+    @Override
+    public String toString()
+    {
+        return "{" + getFirst() + "," + getSecond() + "}";
+    }
+}
+