You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/22 06:13:29 UTC

[39/47] incubator-kylin git commit: KYLIN-875 rename modules: core-common, core-cube, core-dictionary, core-cube

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/pom.xml
----------------------------------------------------------------------
diff --git a/core-common/pom.xml b/core-common/pom.xml
new file mode 100644
index 0000000..cc65833
--- /dev/null
+++ b/core-common/pom.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+ 
+     http://www.apache.org/licenses/LICENSE-2.0
+ 
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kylin-core-common</artifactId>
+    <packaging>jar</packaging>
+    <name>Kylin:Core-Common</name>
+
+    <parent>
+        <groupId>org.apache.kylin</groupId>
+        <artifactId>kylin</artifactId>
+        <version>0.7.2-incubating-SNAPSHOT</version>
+    </parent>
+
+    <properties>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-configuration</groupId>
+            <artifactId>commons-configuration</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.jcraft</groupId>
+            <artifactId>jsch</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.ning</groupId>
+            <artifactId>compress-lzf</artifactId>
+        </dependency>
+
+        <!-- Env & Test -->
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-email</artifactId>
+            <version>1.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <scope>provided</scope>
+            <!-- protobuf version conflict with hbase -->
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-server</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-testing-util</artifactId>
+            <version>${hbase-hadoop2.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>servlet-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet.jsp</groupId>
+                    <artifactId>jsp-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.reflections</groupId>
+            <artifactId>reflections</artifactId>
+            <version>0.9.9-RC1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-compress</artifactId>
+            <version>1.2</version>
+        </dependency>
+         <dependency>
+            <groupId>org.apache.hive.hcatalog</groupId>
+            <artifactId>hive-hcatalog-core</artifactId>
+            <version>${hive-hcatalog.version}</version>
+            <scope>provided</scope>
+          </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
new file mode 100644
index 0000000..89a7fcc
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -0,0 +1,732 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.restclient.RestClient;
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.SortedSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * @author yangli9
+ */
+public class KylinConfig {
+
+    public static final String KYLIN_STORAGE_URL = "kylin.storage.url";
+
+    public static final String KYLIN_METADATA_URL = "kylin.metadata.url";
+
+    public static final String KYLIN_REST_SERVERS = "kylin.rest.servers";
+
+    public static final String KYLIN_REST_TIMEZONE = "kylin.rest.timezone";
+    /**
+     * The dir containing scripts for kylin. For example: /usr/lib/kylin/bin
+     */
+    public static final String KYLIN_SCRIPT_DIR = "kylin.script.dir";
+    /**
+     * The script file name for generating table metadat from hive. For example:
+     * generateTable.sh
+     */
+    public static final String KYLIN_SCRIPT_GEN_TABLE_META = "kylin.script.genTableMeta";
+
+    public static final String KYLIN_JOB_CONCURRENT_MAX_LIMIT = "kylin.job.concurrent.max.limit";
+
+    public static final String KYLIN_JOB_YARN_APP_REST_CHECK_URL = "kylin.job.yarn.app.rest.check.status.url";
+
+    public static final String KYLIN_JOB_YARN_APP_REST_CHECK_INTERVAL_SECONDS = "kylin.job.yarn.app.rest.check.interval.seconds";
+
+    public static final String KYLIN_TMP_HDFS_DIR = "kylin.tmp.hdfs.dir";
+
+    public static final String HIVE_TABLE_LOCATION_PREFIX = "hive.table.location.";
+
+    public static final String KYLIN_JOB_REMOTE_CLI_PASSWORD = "kylin.job.remote.cli.password";
+
+    public static final String KYLIN_JOB_REMOTE_CLI_USERNAME = "kylin.job.remote.cli.username";
+
+    public static final String KYLIN_JOB_REMOTE_CLI_HOSTNAME = "kylin.job.remote.cli.hostname";
+
+    public static final String KYLIN_JOB_REMOTE_CLI_PORT = "kylin.job.remote.cli.port";
+
+    public static final String KYLIN_JOB_REMOTE_CLI_WORKING_DIR = "kylin.job.remote.cli.working.dir";
+
+    public static final String KYLIN_JOB_CMD_EXTRA_ARGS = "kylin.job.cmd.extra.args";
+    /**
+     * Toggle to indicate whether to use hive for table flattening. Default
+     * true.
+     */
+    public static final String KYLIN_JOB_RUN_AS_REMOTE_CMD = "kylin.job.run.as.remote.cmd";
+
+    public static final String KYLIN_JOB_MAPREDUCE_DEFAULT_REDUCE_COUNT_RATIO = "kylin.job.mapreduce.default.reduce.count.ratio";
+
+    public static final String KYLIN_JOB_MAPREDUCE_DEFAULT_REDUCE_INPUT_MB = "kylin.job.mapreduce.default.reduce.input.mb";
+
+    public static final String KYLIN_JOB_MAPREDUCE_MAX_REDUCER_NUMBER = "kylin.job.mapreduce.max.reducer.number";
+
+    public static final String KYLIN_JOB_JAR = "kylin.job.jar";
+
+    public static final String COPROCESSOR_LOCAL_JAR = "kylin.coprocessor.local.jar";
+
+    public static final String KYLIN_JOB_LOG_DIR = "kylin.job.log.dir";
+
+    public static final String KYLIN_JOB_CUBING_IN_MEM = "kylin.job.cubing.inMem";
+    public static final String KYLIN_JOB_CUBING_IN_MEM_SAMPLING_PERCENT = "kylin.job.cubing.inMem.sampling.percent";
+
+    public static final String KYLIN_HDFS_WORKING_DIR = "kylin.hdfs.working.dir";
+
+    public static final String HIVE_PASSWORD = "hive.password";
+
+    public static final String HIVE_USER = "hive.user";
+
+    public static final String HIVE_URL = "hive.url";
+    /**
+     * Key string to specify the kylin evn: prod, dev, qa
+     */
+    public static final String KYLIN_ENV = "KYLIN_ENV";
+    /**
+     * Default Kylin conf path
+     */
+    public static final String KYLIN_CONF_DEFAULT = "/etc/kylin";
+    /**
+     * Kylin properties file
+     */
+    public static final String KYLIN_CONF_PROPERTIES_FILE = "kylin.properties";
+
+    public static final String MAIL_ENABLED = "mail.enabled";
+
+    public static final String MAIL_HOST = "mail.host";
+
+    public static final String MAIL_USERNAME = "mail.username";
+
+    public static final String MAIL_PASSWORD = "mail.password";
+
+    public static final String MAIL_SENDER = "mail.sender";
+
+    public static final String KYLIN_HOME = "KYLIN_HOME";
+    public static final String KYLIN_CONF = "KYLIN_CONF";
+
+    public static final String HBASE_REGION_CUT_SMALL = "kylin.job.hbase.region.cut.small";
+    public static final String HBASE_REGION_CUT_MEDIUM = "kylin.job.hbase.region.cut.medium";
+    public static final String HBASE_REGION_CUT_LARGE = "kylin.job.hbase.region.cut.large";
+
+
+    public static final String SPARK_HOME = "kylin.spark.home";
+    public static final String SPARK_MASTER = "kylin.spark.master";
+
+    private static final Logger logger = LoggerFactory.getLogger(KylinConfig.class);
+
+    public static final String VERSION = "${project.version}";
+
+    // static cached instances
+    private static KylinConfig ENV_INSTANCE = null;
+
+    public static KylinConfig getInstanceFromEnv() {
+        if (ENV_INSTANCE == null) {
+            try {
+                KylinConfig config = loadKylinConfig();
+                ENV_INSTANCE = config;
+            } catch (IllegalArgumentException e) {
+                throw new IllegalStateException("Failed to find KylinConfig ", e);
+            }
+        }
+        return ENV_INSTANCE;
+    }
+
+    public static void destoryInstance() {
+        ENV_INSTANCE = null;
+    }
+
+    public static enum UriType {
+        PROPERTIES_FILE, REST_ADDR, LOCAL_FOLDER
+    }
+
+    private static UriType decideUriType(String metaUri) {
+
+        try {
+            File file = new File(metaUri);
+            if (file.exists() || metaUri.contains("/")) {
+                if (file.exists() == false) {
+                    file.mkdirs();
+                }
+                if (file.isDirectory()) {
+                    return UriType.LOCAL_FOLDER;
+                } else if (file.isFile()) {
+                    if (file.getName().equalsIgnoreCase(KYLIN_CONF_PROPERTIES_FILE)) {
+                        return UriType.PROPERTIES_FILE;
+                    } else {
+                        throw new IllegalStateException("Metadata uri : " + metaUri + " is a local file but not kylin.properties");
+                    }
+                } else {
+                    throw new IllegalStateException("Metadata uri : " + metaUri + " looks like a file but it's neither a file nor a directory");
+                }
+            } else {
+                if (RestClient.matchFullRestPattern(metaUri))
+                    return UriType.REST_ADDR;
+                else
+                    throw new IllegalStateException("Metadata uri : " + metaUri + " is not a valid REST URI address");
+            }
+        } catch (Exception e) {
+            logger.info(e.getLocalizedMessage());
+            throw new IllegalStateException("Metadata uri : " + metaUri + " is not recognized");
+        }
+    }
+
+    public static KylinConfig createInstanceFromUri(String uri) {
+        /**
+         * --hbase: 1. PROPERTIES_FILE: path to kylin.properties 2. REST_ADDR:
+         * rest service resource, format: user:password@host:port --local: 1.
+         * LOCAL_FOLDER: path to resource folder
+         */
+        UriType uriType = decideUriType(uri);
+        logger.info("The URI " + uri + " is recognized as " + uriType);
+
+        if (uriType == UriType.LOCAL_FOLDER) {
+            KylinConfig config = new KylinConfig();
+            config.setMetadataUrl(uri);
+            return config;
+        } else if (uriType == UriType.PROPERTIES_FILE) {
+            KylinConfig config;
+            try {
+                config = new KylinConfig();
+                InputStream is = new FileInputStream(uri);
+                config.reloadKylinConfig(is);
+                is.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            return config;
+        } else {// rest_addr
+            try {
+                KylinConfig config = new KylinConfig();
+                RestClient client = new RestClient(uri);
+                String propertyText = client.getKylinProperties();
+                InputStream is = IOUtils.toInputStream(propertyText);
+                config.reloadKylinConfig(is);
+                is.close();
+                return config;
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public static KylinConfig getKylinConfigFromInputStream(InputStream is) {
+        KylinConfig config = new KylinConfig();
+        config.reloadKylinConfig(is);
+        return config;
+    }
+
+    // ============================================================================
+
+    /**
+     * Find config from environment. The Search process: 1. Check the
+     * $KYLIN_CONF/kylin.properties 2. Check the /etc/kylin/kylin.properties 3.
+     * Check the kylin.properties in classpath
+     *
+     * @return
+     */
+    private static KylinConfig loadKylinConfig() {
+        InputStream is = getKylinPropertiesAsInputSteam();
+        if (is == null) {
+            throw new IllegalArgumentException("Failed to load kylin config");
+        }
+        KylinConfig config = new KylinConfig();
+        config.reloadKylinConfig(is);
+        return config;
+    }
+
+    private PropertiesConfiguration kylinConfig = new PropertiesConfiguration();
+
+    private String metadataUrl;
+    private String storageUrl;
+
+    public CliCommandExecutor getCliCommandExecutor() throws IOException {
+        CliCommandExecutor exec = new CliCommandExecutor();
+        if (getRunAsRemoteCommand()) {
+            exec.setRunAtRemote(getRemoteHadoopCliHostname(), getRemoteHadoopCliPort(), getRemoteHadoopCliUsername(), getRemoteHadoopCliPassword());
+        }
+        return exec;
+    }
+
+    // ============================================================================
+
+    public String getStorageUrl() {
+        return storageUrl;
+    }
+
+    public String getHiveUrl() {
+        return getOptional(HIVE_URL, "");
+    }
+
+    public String getHiveUser() {
+        return getOptional(HIVE_USER, "");
+    }
+
+    public String getHivePassword() {
+        return getOptional(HIVE_PASSWORD, "");
+    }
+
+    public String getHdfsWorkingDirectory() {
+        String root = getRequired(KYLIN_HDFS_WORKING_DIR);
+        if (!root.endsWith("/")) {
+            root += "/";
+        }
+        return root + getMetadataUrlPrefix();
+    }
+
+    public String getKylinJobLogDir() {
+        return getOptional(KYLIN_JOB_LOG_DIR, "/tmp/kylin/logs");
+    }
+
+    public String getKylinJobJarPath() {
+        final String jobJar = getOptional(KYLIN_JOB_JAR);
+        if (StringUtils.isNotEmpty(jobJar)) {
+            return jobJar;
+        }
+        String kylinHome = getKylinHome();
+        if (StringUtils.isEmpty(kylinHome)) {
+            return "";
+        }
+        return getFileName(kylinHome + File.separator + "lib", JOB_JAR_NAME_PATTERN);
+    }
+
+    public void overrideKylinJobJarPath(String path) {
+        logger.info("override " + KYLIN_JOB_JAR + " to " + path);
+        System.setProperty(KYLIN_JOB_JAR, path);
+    }
+
+    private static final Pattern COPROCESSOR_JAR_NAME_PATTERN = Pattern.compile("kylin-coprocessor-(.+)\\.jar");
+    private static final Pattern JOB_JAR_NAME_PATTERN = Pattern.compile("kylin-job-(.+)\\.jar");
+
+    public String getCoprocessorLocalJar() {
+        final String coprocessorJar = getOptional(COPROCESSOR_LOCAL_JAR);
+        if (StringUtils.isNotEmpty(coprocessorJar)) {
+            return coprocessorJar;
+        }
+        String kylinHome = getKylinHome();
+        if (StringUtils.isEmpty(kylinHome)) {
+            throw new RuntimeException("getCoprocessorLocalJar needs KYLIN_HOME");
+        }
+        return getFileName(kylinHome + File.separator + "lib", COPROCESSOR_JAR_NAME_PATTERN);
+    }
+
+    private static String getFileName(String homePath, Pattern pattern) {
+        File home = new File(homePath);
+        SortedSet<String> files = Sets.newTreeSet();
+        if (home.exists() && home.isDirectory()) {
+            for (File file : home.listFiles()) {
+                final Matcher matcher = pattern.matcher(file.getName());
+                if (matcher.matches()) {
+                    files.add(file.getAbsolutePath());
+                }
+            }
+        }
+        if (files.isEmpty()) {
+            throw new RuntimeException("cannot find " + pattern.toString() + " in " + homePath);
+        } else {
+            return files.last();
+        }
+    }
+
+    public void overrideCoprocessorLocalJar(String path) {
+        logger.info("override " + COPROCESSOR_LOCAL_JAR + " to " + path);
+        System.setProperty(COPROCESSOR_LOCAL_JAR, path);
+    }
+
+    public double getDefaultHadoopJobReducerInputMB() {
+        return Double.parseDouble(getOptional(KYLIN_JOB_MAPREDUCE_DEFAULT_REDUCE_INPUT_MB, "500"));
+    }
+
+    public double getDefaultHadoopJobReducerCountRatio() {
+        return Double.parseDouble(getOptional(KYLIN_JOB_MAPREDUCE_DEFAULT_REDUCE_COUNT_RATIO, "1.0"));
+    }
+
+    public int getHadoopJobMaxReducerNumber() {
+        return Integer.parseInt(getOptional(KYLIN_JOB_MAPREDUCE_MAX_REDUCER_NUMBER, "5000"));
+    }
+
+    public boolean getRunAsRemoteCommand() {
+        return Boolean.parseBoolean(getOptional(KYLIN_JOB_RUN_AS_REMOTE_CMD));
+    }
+
+    public int getRemoteHadoopCliPort() {
+        return Integer.parseInt(getOptional(KYLIN_JOB_REMOTE_CLI_PORT, "22"));
+    }
+
+    public String getRemoteHadoopCliHostname() {
+        return getOptional(KYLIN_JOB_REMOTE_CLI_HOSTNAME);
+    }
+
+    public String getRemoteHadoopCliUsername() {
+        return getOptional(KYLIN_JOB_REMOTE_CLI_USERNAME);
+    }
+
+    public String getRemoteHadoopCliPassword() {
+        return getOptional(KYLIN_JOB_REMOTE_CLI_PASSWORD);
+    }
+
+    public String getCliWorkingDir() {
+        return getOptional(KYLIN_JOB_REMOTE_CLI_WORKING_DIR);
+    }
+
+    public String getMapReduceCmdExtraArgs() {
+        return getOptional(KYLIN_JOB_CMD_EXTRA_ARGS);
+    }
+
+    public String getOverrideHiveTableLocation(String table) {
+        return getOptional(HIVE_TABLE_LOCATION_PREFIX + table.toUpperCase());
+    }
+
+    public String getTempHDFSDir() {
+        return getOptional(KYLIN_TMP_HDFS_DIR, "/tmp/kylin");
+    }
+
+    public String getYarnStatusCheckUrl() {
+        return getOptional(KYLIN_JOB_YARN_APP_REST_CHECK_URL, null);
+    }
+
+    public int getYarnStatusCheckIntervalSeconds() {
+        return Integer.parseInt(getOptional(KYLIN_JOB_YARN_APP_REST_CHECK_INTERVAL_SECONDS, "60"));
+    }
+
+    public int getMaxConcurrentJobLimit() {
+        return Integer.parseInt(getOptional(KYLIN_JOB_CONCURRENT_MAX_LIMIT, "10"));
+    }
+
+    public String getTimeZone() {
+        return getOptional(KYLIN_REST_TIMEZONE, "PST");
+    }
+
+    public String[] getRestServers() {
+        return getOptionalStringArray(KYLIN_REST_SERVERS);
+    }
+
+    public String getAdminDls() {
+        return getOptional("kylin.job.admin.dls", null);
+    }
+
+    public long getJobStepTimeout() {
+        return Long.parseLong(getOptional("kylin.job.step.timeout", String.valueOf(2 * 60 * 60)));
+    }
+
+    public String getServerMode() {
+        return this.getOptional("kylin.server.mode", "all");
+    }
+
+    public int getDictionaryMaxCardinality() {
+        return Integer.parseInt(getOptional("kylin.dictionary.max.cardinality", "5000000"));
+    }
+
+    public int getTableSnapshotMaxMB() {
+        return Integer.parseInt(getOptional("kylin.table.snapshot.max_mb", "300"));
+    }
+
+    public int getHBaseRegionCutMin() {
+        return Integer.parseInt(getOptional("kylin.job.hbase.region.cut.min", "2"));
+    }
+
+    public int getHBaseRegionCutMax() {
+        return Integer.parseInt(getOptional("kylin.job.hbase.region.cut.max", "1000"));
+    }
+
+    public int getScanThreshold() {
+        return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000"));
+    }
+
+    public boolean getQueryRunLocalCoprocessor() {
+        return Boolean.parseBoolean(getOptional("kylin.query.run.local.coprocessor", "false"));
+    }
+
+    public Long getQueryDurationCacheThreshold() {
+        return Long.parseLong(this.getOptional("kylin.query.cache.threshold.duration", String.valueOf(2000)));
+    }
+
+    public Long getQueryScanCountCacheThreshold() {
+        return Long.parseLong(this.getOptional("kylin.query.cache.threshold.scancount", String.valueOf(10 * 1024)));
+    }
+
+    public boolean isQuerySecureEnabled() {
+        return Boolean.parseBoolean(this.getOptional("kylin.query.security.enabled", "false"));
+    }
+
+    public boolean isQueryCacheEnabled() {
+        return Boolean.parseBoolean(this.getOptional("kylin.query.cache.enabled", "true"));
+    }
+
+    public int getHBaseKeyValueSize() {
+        return Integer.parseInt(this.getOptional("kylin.hbase.client.keyvalue.maxsize", "10485760"));
+    }
+
+    public boolean isCubingInMem() {
+        return Boolean.parseBoolean(this.getOptional(KYLIN_JOB_CUBING_IN_MEM, "false"));
+    }
+
+    public int getCubingInMemSamplingPercent() {
+        int percent = Integer.parseInt(this.getOptional(KYLIN_JOB_CUBING_IN_MEM_SAMPLING_PERCENT, "5"));
+        percent = Math.max(percent, 1);
+        percent = Math.min(percent, 100);
+        return percent;
+    }
+
+    private String getOptional(String prop) {
+        final String property = System.getProperty(prop);
+        return property != null ? property : kylinConfig.getString(prop);
+    }
+
+    private String[] getOptionalStringArray(String prop) {
+        final String property = System.getProperty(prop);
+        if (!StringUtils.isBlank(property))
+            return property.split("\\s*,\\s*");
+
+        return kylinConfig.getStringArray(prop);
+    }
+
+    private String getOptional(String prop, String dft) {
+        final String property = System.getProperty(prop);
+        return property != null ? property : kylinConfig.getString(prop, dft);
+    }
+
+    private String getRequired(String prop) {
+        final String property = System.getProperty(prop);
+        if (property != null) {
+            return property;
+        }
+        String r = kylinConfig.getString(prop);
+        if (StringUtils.isEmpty(r)) {
+            throw new IllegalArgumentException("missing '" + prop + "' in conf/kylin_instance.properties");
+        }
+        return r;
+    }
+
+    void reloadKylinConfig(InputStream is) {
+        PropertiesConfiguration config = new PropertiesConfiguration();
+        try {
+            config.load(is);
+        } catch (ConfigurationException e) {
+            throw new RuntimeException("Cannot load kylin config.", e);
+        } finally {
+            try {
+                is.close();
+            } catch (IOException e) {
+                logger.error("Failed to close inputstream.", e);
+            }
+        }
+        this.kylinConfig = config;
+        this.metadataUrl = getOptional(KYLIN_METADATA_URL);
+        this.storageUrl = getOptional(KYLIN_STORAGE_URL);
+    }
+
+    public void writeProperties(File file) throws IOException {
+        try {
+            kylinConfig.save(file);
+        } catch (ConfigurationException ex) {
+            throw new IOException("Error writing KylinConfig to " + file, ex);
+        }
+    }
+
+    public static String getKylinHome() {
+        String kylinHome = System.getenv(KYLIN_HOME);
+        if (StringUtils.isEmpty(kylinHome)) {
+            logger.warn("KYLIN_HOME was not set");
+            return kylinHome;
+        }
+        return kylinHome;
+    }
+
+    public void printProperties() throws IOException {
+        try {
+            kylinConfig.save(System.out);
+        } catch (ConfigurationException ex) {
+            throw new IOException("Error printing KylinConfig", ex);
+        }
+    }
+
+    private static File getKylinProperties() {
+        String kylinConfHome = System.getProperty(KYLIN_CONF);
+        if (!StringUtils.isEmpty(kylinConfHome)) {
+            logger.info("Use KYLIN_CONF=" + kylinConfHome);
+            return getKylinPropertiesFile(kylinConfHome);
+        }
+
+        logger.warn("KYLIN_CONF property was not set, will seek KYLIN_HOME env variable");
+
+        String kylinHome = getKylinHome();
+        if (StringUtils.isEmpty(kylinHome))
+            throw new RuntimeException("Didn't find KYLIN_CONF or KYLIN_HOME, please set one of them");
+
+        String path = kylinHome + File.separator + "conf";
+        return getKylinPropertiesFile(path);
+
+    }
+
+    public static InputStream getKylinPropertiesAsInputSteam() {
+        File propFile = getKylinProperties();
+        if (propFile == null || !propFile.exists()) {
+            logger.error("fail to locate kylin.properties");
+            throw new RuntimeException("fail to locate kylin.properties");
+        }
+
+        File overrideFile = new File(propFile.getParentFile(), propFile.getName() + ".override");
+        if (overrideFile.exists()) {
+            try {
+                PropertiesConfiguration conf = new PropertiesConfiguration();
+                conf.load(propFile);
+                PropertiesConfiguration override = new PropertiesConfiguration();
+                override.load(overrideFile);
+                conf.copy(override);
+                ByteArrayOutputStream bout = new ByteArrayOutputStream();
+                conf.save(bout);
+                return new ByteArrayInputStream(bout.toByteArray());
+            } catch (ConfigurationException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        try {
+            return new FileInputStream(propFile);
+        } catch (FileNotFoundException e) {
+            logger.error("this should not happen");
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    /**
+     * Check if there is kylin.properties exist
+     *
+     * @param path
+     * @return the properties file
+     */
+    private static File getKylinPropertiesFile(String path) {
+        if (path == null) {
+            return null;
+        }
+
+        return new File(path, KYLIN_CONF_PROPERTIES_FILE);
+    }
+
+    public String getMetadataUrl() {
+        return metadataUrl;
+    }
+
+    public String getMetadataUrlPrefix() {
+        String hbaseMetadataUrl = getMetadataUrl();
+        String defaultPrefix = "kylin_metadata";
+
+        if (org.apache.commons.lang3.StringUtils.containsIgnoreCase(hbaseMetadataUrl, "@hbase")) {
+            int cut = hbaseMetadataUrl.indexOf('@');
+            String tmp = cut < 0 ? defaultPrefix : hbaseMetadataUrl.substring(0, cut);
+            return tmp;
+        } else {
+            return defaultPrefix;
+        }
+    }
+
+    public void setMetadataUrl(String metadataUrl) {
+        kylinConfig.setProperty(KYLIN_METADATA_URL, metadataUrl);
+        this.metadataUrl = metadataUrl;
+    }
+
+    public void setStorageUrl(String storageUrl) {
+        kylinConfig.setProperty(KYLIN_STORAGE_URL, storageUrl);
+        this.storageUrl = storageUrl;
+    }
+
+    public void setRunAsRemoteCommand(String v) {
+        kylinConfig.setProperty(KYLIN_JOB_RUN_AS_REMOTE_CMD, v);
+    }
+
+    public void setRemoteHadoopCliHostname(String v) {
+        kylinConfig.setProperty(KYLIN_JOB_REMOTE_CLI_HOSTNAME, v);
+    }
+
+    public void setRemoteHadoopCliUsername(String v) {
+        kylinConfig.setProperty(KYLIN_JOB_REMOTE_CLI_USERNAME, v);
+    }
+
+    public void setRemoteHadoopCliPassword(String v) {
+        kylinConfig.setProperty(KYLIN_JOB_REMOTE_CLI_PASSWORD, v);
+    }
+
+    public String getProperty(String key, String defaultValue) {
+        return kylinConfig.getString(key, defaultValue);
+    }
+
+    /**
+     * Set a new key:value into the kylin config.
+     *
+     * @param key
+     * @param value
+     */
+    public void setProperty(String key, String value) {
+        logger.info("Kylin Config was updated with " + key + " : " + value);
+        kylinConfig.setProperty(key, value);
+    }
+
+    public String getConfigAsString() throws IOException {
+        try {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            kylinConfig.save(baos);
+            String content = baos.toString();
+            return content;
+        } catch (ConfigurationException ex) {
+            throw new IOException("Error writing KylinConfig to String", ex);
+        }
+    }
+
+    public String getSparkHome() {
+        return kylinConfig.getString(SPARK_HOME);
+    }
+    public String getSparkMaster() {
+        return kylinConfig.getString(SPARK_MASTER);
+    }
+
+    public int getHBaseRegionCut(String capacity) {
+        String cut;
+        switch (capacity) {
+        case "SMALL":
+            cut = getProperty(HBASE_REGION_CUT_SMALL, "5");
+            break;
+        case "MEDIUM":
+            cut = getProperty(HBASE_REGION_CUT_MEDIUM, "10");
+            break;
+        case "LARGE":
+            cut = getProperty(HBASE_REGION_CUT_LARGE, "50");
+            break;
+        default:
+            throw new IllegalArgumentException("Capacity not recognized: " + capacity);
+        }
+
+        return Integer.valueOf(cut);
+    }
+
+    public String toString() {
+        return getMetadataUrl();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java b/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java
new file mode 100644
index 0000000..186fd42
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java
@@ -0,0 +1,10 @@
+package org.apache.kylin.common.cache;
+
+import org.apache.kylin.common.restclient.AbstractRestCache;
+import org.apache.kylin.common.restclient.Broadcaster;
+
+/**
+ */
+public interface CacheUpdater {
+    void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java b/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java
new file mode 100644
index 0000000..8f629e3
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java
@@ -0,0 +1,17 @@
+package org.apache.kylin.common.cache;
+
+import org.apache.kylin.common.restclient.AbstractRestCache;
+import org.apache.kylin.common.restclient.Broadcaster;
+
+/**
+ */
+public class LocalCacheUpdater implements CacheUpdater {
+    @Override
+    public void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache) {
+        if (syncAction == Broadcaster.EVENT.CREATE || syncAction == Broadcaster.EVENT.UPDATE) {
+            cache.putLocal(key, value);
+        } else if (syncAction == Broadcaster.EVENT.DROP) {
+            cache.removeLocal(key);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java b/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java
new file mode 100644
index 0000000..2a5e59a
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java
@@ -0,0 +1,13 @@
+package org.apache.kylin.common.cache;
+
+import org.apache.kylin.common.restclient.AbstractRestCache;
+import org.apache.kylin.common.restclient.Broadcaster;
+
+/**
+ */
+public class RemoteCacheUpdater implements CacheUpdater {
+    @Override
+    public void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache) {
+        Broadcaster.getInstance().queue(type.getType(), syncAction.getType(), key.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
new file mode 100644
index 0000000..560a836
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.debug;
+
+import java.util.Map;
+
+/**
+ */
+public class BackdoorToggles {
+
+    private static final ThreadLocal<Map<String, String>> _backdoorToggles = new ThreadLocal<Map<String, String>>();
+
+    public static void setToggles(Map<String, String> toggles) {
+        _backdoorToggles.set(toggles);
+    }
+
+    public static String getObserverBehavior() {
+        return getString(DEBUG_TOGGLE_OBSERVER_BEHAVIOR);
+    }
+    
+    public static boolean getDisableFuzzyKey() {
+        return getBoolean(DEBUG_TOGGLE_DISABLE_FUZZY_KEY);
+    }
+    
+    public static boolean getRunLocalCoprocessor() {
+        return getBoolean(DEBUG_TOGGLE_LOCAL_COPROCESSOR);
+    }
+    
+    private static String getString(String key) {
+        Map<String, String> toggles = _backdoorToggles.get();
+        if (toggles == null) {
+            return null;
+        } else {
+            return toggles.get(key);
+        }
+    }
+    
+    private static boolean getBoolean(String key) {
+        return "true".equals(getString(key));
+    }
+
+    public static void cleanToggles() {
+        _backdoorToggles.remove();
+    }
+
+    /**
+     * set DEBUG_TOGGLE_DISABLE_FUZZY_KEY=true to disable fuzzy key for debug/profile usage
+     *
+     *
+     *
+     example:(put it into request body)
+     "backdoorToggles": {
+        "DEBUG_TOGGLE_DISABLE_FUZZY_KEY": "true"
+     }
+     */
+    public final static String DEBUG_TOGGLE_DISABLE_FUZZY_KEY = "DEBUG_TOGGLE_DISABLE_FUZZY_KEY";
+
+    /**
+     * set DEBUG_TOGGLE_OBSERVER_BEHAVIOR=SCAN/SCAN_FILTER/SCAN_FILTER_AGGR to control observer behavior for debug/profile usage
+     *
+     example:(put it into request body)
+     "backdoorToggles": {
+        "DEBUG_TOGGLE_OBSERVER_BEHAVIOR": "SCAN"
+     }
+     */
+    public final static String DEBUG_TOGGLE_OBSERVER_BEHAVIOR = "DEBUG_TOGGLE_OBSERVER_BEHAVIOR";
+    
+    /**
+     * set DEBUG_TOGGLE_LOCAL_COPROCESSOR=true to run coprocessor at client side (not in HBase region server)
+     *
+     example:(put it into request body)
+     "backdoorToggles": {
+        "DEBUG_TOGGLE_LOCAL_COPROCESSOR": "true"
+     }
+     */
+    public final static String DEBUG_TOGGLE_LOCAL_COPROCESSOR = "DEBUG_TOGGLE_LOCAL_COPROCESSOR";
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java b/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
new file mode 100644
index 0000000..50babd9
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.hll;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+import org.apache.kylin.common.util.BytesUtil;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+/**
+ * About compression, test on HLLC data shows
+ * 
+ * - LZF compression ratio is around 65%-80%, fast
+ * - GZIP compression ratio is around 41%-46%, very slow
+ * 
+ * @author yangli9
+ */
+public class HyperLogLogPlusCounter implements Comparable<HyperLogLogPlusCounter> {
+
+    private final int p;
+    private final int m;
+    private final HashFunction hashFunc;
+    byte[] registers;
+
+    public HyperLogLogPlusCounter() {
+        this(10);
+    }
+
+    public HyperLogLogPlusCounter(int p) {
+        this(p, Hashing.murmur3_128());
+    }
+
+    public HyperLogLogPlusCounter(HyperLogLogPlusCounter another) {
+        this(another.p, another.hashFunc);
+        merge(another);
+    }
+
+    /** The larger p is, the more storage (2^p bytes), the better accuracy */
+    private HyperLogLogPlusCounter(int p, HashFunction hashFunc) {
+        this.p = p;
+        this.m = 1 << p;//(int) Math.pow(2, p);
+        this.hashFunc = hashFunc;
+        this.registers = new byte[m];
+    }
+
+    public void clear() {
+        byte zero = (byte) 0;
+        Arrays.fill(registers, zero);
+    }
+
+    public void add(int value) {
+        add(hashFunc.hashInt(value).asLong());
+    }
+
+    public void add(String value) {
+        add(hashFunc.hashString(value, Charset.defaultCharset()).asLong());
+    }
+
+    public void add(byte[] value) {
+        add(hashFunc.hashBytes(value).asLong());
+    }
+
+    public void add(byte[] value, int offset, int length) {
+        add(hashFunc.hashBytes(value, offset, length).asLong());
+    }
+
+    protected void add(long hash) {
+        int bucketMask = m - 1;
+        int bucket = (int) (hash & bucketMask);
+        int firstOnePos = Long.numberOfLeadingZeros(hash | bucketMask) + 1;
+
+        if (firstOnePos > registers[bucket])
+            registers[bucket] = (byte) firstOnePos;
+    }
+
+    public void merge(HyperLogLogPlusCounter another) {
+        assert this.p == another.p;
+        assert this.hashFunc == another.hashFunc;
+
+        for (int i = 0; i < m; i++) {
+            if (registers[i] < another.registers[i])
+                registers[i] = another.registers[i];
+        }
+    }
+
+    public long getCountEstimate() {
+        return new HLLCSnapshot(this).getCountEstimate();
+    }
+
+    public double getErrorRate() {
+        return 1.04 / Math.sqrt(m);
+    }
+
+    private int size() {
+        int size = 0;
+        for (int i = 0; i < m; i++) {
+            if (registers[i] > 0)
+                size++;
+        }
+        return size;
+    }
+    
+    @Override
+    public String toString() {
+        return "" + getCountEstimate();
+    }
+
+    // ============================================================================
+
+    // a memory efficient snapshot of HLL registers which can yield count
+    // estimate later
+    public static class HLLCSnapshot {
+        byte p;
+        double registerSum;
+        int zeroBuckets;
+
+        public HLLCSnapshot(HyperLogLogPlusCounter hllc) {
+            p = (byte) hllc.p;
+            registerSum = 0;
+            zeroBuckets = 0;
+
+            byte[] registers = hllc.registers;
+            for (int i = 0; i < hllc.m; i++) {
+                if (registers[i] == 0) {
+                    registerSum++;
+                    zeroBuckets++;
+                } else {
+                    registerSum += 1.0 / (1 << registers[i]);
+                }
+            }
+        }
+
+        public long getCountEstimate() {
+            int m = (int) Math.pow(2, p);
+            double alpha = 1 / (2 * Math.log(2) * (1 + (3 * Math.log(2) - 1) / m));
+            double alphaMM = alpha * m * m;
+            double estimate = alphaMM / registerSum;
+
+            // small cardinality adjustment
+            if (zeroBuckets >= m * 0.07) { // (reference presto's HLL impl)
+                estimate = m * Math.log(m * 1.0 / zeroBuckets);
+            } else if (HyperLogLogPlusTable.isBiasCorrection(m, estimate)) {
+                estimate = HyperLogLogPlusTable.biasCorrection(p, estimate);
+            }
+
+            return Math.round(estimate);
+        }
+    }
+
+    // ============================================================================
+
+    public void writeRegisters(final ByteBuffer out) throws IOException {
+
+        final int indexLen = getRegisterIndexSize();
+        int size = size();
+
+        // decide output scheme -- map (3*size bytes) or array (2^p bytes)
+        byte scheme;
+        if (5 + (indexLen + 1) * size < m) // 5 is max len of vint
+            scheme = 0; // map
+        else
+            scheme = 1; // array
+        out.put(scheme);
+
+        if (scheme == 0) { // map scheme
+            BytesUtil.writeVInt(size, out);
+            for (int i = 0; i < m; i++) {
+                if (registers[i] > 0) {
+                    writeUnsigned(i, indexLen, out);
+                    out.put(registers[i]);
+                }
+            }
+        } else if (scheme == 1) { // array scheme
+            out.put(registers);
+        } else
+            throw new IllegalStateException();
+    }
+
+    public void readRegisters(ByteBuffer in) throws IOException {
+        byte scheme = in.get();
+
+        if (scheme == 0) { // map scheme
+            clear();
+            int size = BytesUtil.readVInt(in);
+            if (size > m)
+                throw new IllegalArgumentException("register size (" + size + ") cannot be larger than m (" + m + ")");
+            int indexLen = getRegisterIndexSize();
+            for (int i = 0; i < size; i++) {
+                int key = readUnsigned(in, indexLen);
+                registers[key] = in.get();
+            }
+        } else if (scheme == 1) { // array scheme
+            in.get(registers);
+        } else
+            throw new IllegalStateException();
+    }
+    
+    public int peekLength(ByteBuffer in) {
+        int mark = in.position();
+        int len;
+        
+        byte scheme = in.get();
+        if (scheme == 0) { // map scheme
+            int size = BytesUtil.readVInt(in);
+            int indexLen = getRegisterIndexSize();
+            len = in.position() - mark + (indexLen + 1) * size;
+        } else {
+            len = in.position() - mark + m;
+        }
+        
+        in.position(mark);
+        return len;
+    }
+    
+    public int maxLength() {
+        return 1 + m;
+    }
+
+    public void writeRegistersArray(final ByteBuffer out) {
+        out.put(this.registers);
+    }
+
+    public void readRegistersArray(ByteBuffer in) {
+        in.get(registers, 0, m);
+    }
+
+    private int getRegisterIndexSize() {
+        return (p - 1) / 8 + 1; // 2 when p=16, 3 when p=17
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((hashFunc == null) ? 0 : hashFunc.hashCode());
+        result = prime * result + p;
+        result = prime * result + Arrays.hashCode(registers);
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        HyperLogLogPlusCounter other = (HyperLogLogPlusCounter) obj;
+        if (hashFunc == null) {
+            if (other.hashFunc != null)
+                return false;
+        } else if (!hashFunc.equals(other.hashFunc))
+            return false;
+        if (p != other.p)
+            return false;
+        if (!Arrays.equals(registers, other.registers))
+            return false;
+        return true;
+    }
+
+    @Override
+    public int compareTo(HyperLogLogPlusCounter o) {
+        if (o == null)
+            return 1;
+
+        long e1 = this.getCountEstimate();
+        long e2 = o.getCountEstimate();
+
+        if (e1 == e2)
+            return 0;
+        else if (e1 > e2)
+            return 1;
+        else
+            return -1;
+    }
+
+    public static void main(String[] args) throws IOException {
+        dumpErrorRates();
+    }
+
+    static void dumpErrorRates() {
+        for (int p = 10; p <= 18; p++) {
+            double rate = new HyperLogLogPlusCounter(p).getErrorRate();
+            double er = Math.round(rate * 10000) / 100D;
+            double er2 = Math.round(rate * 2 * 10000) / 100D;
+            double er3 = Math.round(rate * 3 * 10000) / 100D;
+            long size = Math.round(Math.pow(2, p));
+            System.out.println("HLLC" + p + ",\t" + size + " bytes,\t68% err<" + er + "%" + ",\t95% err<" + er2 + "%" + ",\t99.7% err<" + er3 + "%");
+        }
+    }
+
+    /**
+     *
+     * @param num
+     * @param size
+     * @param out
+     */
+    public static void writeUnsigned(int num, int size, ByteBuffer out) {
+        for (int i = 0; i < size; i++) {
+            out.put((byte) num);
+            num >>>= 8;
+        }
+    }
+
+    public static int readUnsigned(ByteBuffer in, int size) {
+        int integer = 0;
+        int mask = 0xff;
+        int shift = 0;
+        for (int i = 0; i < size; i++) {
+            integer |= (in.get() << shift) & mask;
+            mask = mask << 8;
+            shift += 8;
+        }
+        return integer;
+    }
+}