You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/22 06:13:29 UTC
[39/47] incubator-kylin git commit: KYLIN-875 rename modules:
core-common, core-cube, core-dictionary, core-cube
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/pom.xml
----------------------------------------------------------------------
diff --git a/core-common/pom.xml b/core-common/pom.xml
new file mode 100644
index 0000000..cc65833
--- /dev/null
+++ b/core-common/pom.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>kylin-core-common</artifactId>
+ <packaging>jar</packaging>
+ <name>Kylin:Core-Common</name>
+
+ <parent>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin</artifactId>
+ <version>0.7.2-incubating-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.jcraft</groupId>
+ <artifactId>jsch</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.ning</groupId>
+ <artifactId>compress-lzf</artifactId>
+ </dependency>
+
+ <!-- Env & Test -->
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-email</artifactId>
+ <version>1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>provided</scope>
+ <!-- protobuf version conflict with hbase -->
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-testing-util</artifactId>
+ <version>${hbase-hadoop2.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
+ <version>0.9.9-RC1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>1.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-hcatalog-core</artifactId>
+ <version>${hive-hcatalog.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
new file mode 100644
index 0000000..89a7fcc
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -0,0 +1,732 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.restclient.RestClient;
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.SortedSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * @author yangli9
+ */
+public class KylinConfig {
+
+ public static final String KYLIN_STORAGE_URL = "kylin.storage.url";
+
+ public static final String KYLIN_METADATA_URL = "kylin.metadata.url";
+
+ public static final String KYLIN_REST_SERVERS = "kylin.rest.servers";
+
+ public static final String KYLIN_REST_TIMEZONE = "kylin.rest.timezone";
+ /**
+ * The dir containing scripts for kylin. For example: /usr/lib/kylin/bin
+ */
+ public static final String KYLIN_SCRIPT_DIR = "kylin.script.dir";
+ /**
+ * The script file name for generating table metadat from hive. For example:
+ * generateTable.sh
+ */
+ public static final String KYLIN_SCRIPT_GEN_TABLE_META = "kylin.script.genTableMeta";
+
+ public static final String KYLIN_JOB_CONCURRENT_MAX_LIMIT = "kylin.job.concurrent.max.limit";
+
+ public static final String KYLIN_JOB_YARN_APP_REST_CHECK_URL = "kylin.job.yarn.app.rest.check.status.url";
+
+ public static final String KYLIN_JOB_YARN_APP_REST_CHECK_INTERVAL_SECONDS = "kylin.job.yarn.app.rest.check.interval.seconds";
+
+ public static final String KYLIN_TMP_HDFS_DIR = "kylin.tmp.hdfs.dir";
+
+ public static final String HIVE_TABLE_LOCATION_PREFIX = "hive.table.location.";
+
+ public static final String KYLIN_JOB_REMOTE_CLI_PASSWORD = "kylin.job.remote.cli.password";
+
+ public static final String KYLIN_JOB_REMOTE_CLI_USERNAME = "kylin.job.remote.cli.username";
+
+ public static final String KYLIN_JOB_REMOTE_CLI_HOSTNAME = "kylin.job.remote.cli.hostname";
+
+ public static final String KYLIN_JOB_REMOTE_CLI_PORT = "kylin.job.remote.cli.port";
+
+ public static final String KYLIN_JOB_REMOTE_CLI_WORKING_DIR = "kylin.job.remote.cli.working.dir";
+
+ public static final String KYLIN_JOB_CMD_EXTRA_ARGS = "kylin.job.cmd.extra.args";
+ /**
+ * Toggle to indicate whether to use hive for table flattening. Default
+ * true.
+ */
+ public static final String KYLIN_JOB_RUN_AS_REMOTE_CMD = "kylin.job.run.as.remote.cmd";
+
+ public static final String KYLIN_JOB_MAPREDUCE_DEFAULT_REDUCE_COUNT_RATIO = "kylin.job.mapreduce.default.reduce.count.ratio";
+
+ public static final String KYLIN_JOB_MAPREDUCE_DEFAULT_REDUCE_INPUT_MB = "kylin.job.mapreduce.default.reduce.input.mb";
+
+ public static final String KYLIN_JOB_MAPREDUCE_MAX_REDUCER_NUMBER = "kylin.job.mapreduce.max.reducer.number";
+
+ public static final String KYLIN_JOB_JAR = "kylin.job.jar";
+
+ public static final String COPROCESSOR_LOCAL_JAR = "kylin.coprocessor.local.jar";
+
+ public static final String KYLIN_JOB_LOG_DIR = "kylin.job.log.dir";
+
+ public static final String KYLIN_JOB_CUBING_IN_MEM = "kylin.job.cubing.inMem";
+ public static final String KYLIN_JOB_CUBING_IN_MEM_SAMPLING_PERCENT = "kylin.job.cubing.inMem.sampling.percent";
+
+ public static final String KYLIN_HDFS_WORKING_DIR = "kylin.hdfs.working.dir";
+
+ public static final String HIVE_PASSWORD = "hive.password";
+
+ public static final String HIVE_USER = "hive.user";
+
+ public static final String HIVE_URL = "hive.url";
+ /**
+ * Key string to specify the kylin evn: prod, dev, qa
+ */
+ public static final String KYLIN_ENV = "KYLIN_ENV";
+ /**
+ * Default Kylin conf path
+ */
+ public static final String KYLIN_CONF_DEFAULT = "/etc/kylin";
+ /**
+ * Kylin properties file
+ */
+ public static final String KYLIN_CONF_PROPERTIES_FILE = "kylin.properties";
+
+ public static final String MAIL_ENABLED = "mail.enabled";
+
+ public static final String MAIL_HOST = "mail.host";
+
+ public static final String MAIL_USERNAME = "mail.username";
+
+ public static final String MAIL_PASSWORD = "mail.password";
+
+ public static final String MAIL_SENDER = "mail.sender";
+
+ public static final String KYLIN_HOME = "KYLIN_HOME";
+ public static final String KYLIN_CONF = "KYLIN_CONF";
+
+ public static final String HBASE_REGION_CUT_SMALL = "kylin.job.hbase.region.cut.small";
+ public static final String HBASE_REGION_CUT_MEDIUM = "kylin.job.hbase.region.cut.medium";
+ public static final String HBASE_REGION_CUT_LARGE = "kylin.job.hbase.region.cut.large";
+
+
+ public static final String SPARK_HOME = "kylin.spark.home";
+ public static final String SPARK_MASTER = "kylin.spark.master";
+
+ private static final Logger logger = LoggerFactory.getLogger(KylinConfig.class);
+
+ public static final String VERSION = "${project.version}";
+
+ // static cached instances
+ private static KylinConfig ENV_INSTANCE = null;
+
+ public static KylinConfig getInstanceFromEnv() {
+ if (ENV_INSTANCE == null) {
+ try {
+ KylinConfig config = loadKylinConfig();
+ ENV_INSTANCE = config;
+ } catch (IllegalArgumentException e) {
+ throw new IllegalStateException("Failed to find KylinConfig ", e);
+ }
+ }
+ return ENV_INSTANCE;
+ }
+
+ public static void destoryInstance() {
+ ENV_INSTANCE = null;
+ }
+
+ public static enum UriType {
+ PROPERTIES_FILE, REST_ADDR, LOCAL_FOLDER
+ }
+
+ private static UriType decideUriType(String metaUri) {
+
+ try {
+ File file = new File(metaUri);
+ if (file.exists() || metaUri.contains("/")) {
+ if (file.exists() == false) {
+ file.mkdirs();
+ }
+ if (file.isDirectory()) {
+ return UriType.LOCAL_FOLDER;
+ } else if (file.isFile()) {
+ if (file.getName().equalsIgnoreCase(KYLIN_CONF_PROPERTIES_FILE)) {
+ return UriType.PROPERTIES_FILE;
+ } else {
+ throw new IllegalStateException("Metadata uri : " + metaUri + " is a local file but not kylin.properties");
+ }
+ } else {
+ throw new IllegalStateException("Metadata uri : " + metaUri + " looks like a file but it's neither a file nor a directory");
+ }
+ } else {
+ if (RestClient.matchFullRestPattern(metaUri))
+ return UriType.REST_ADDR;
+ else
+ throw new IllegalStateException("Metadata uri : " + metaUri + " is not a valid REST URI address");
+ }
+ } catch (Exception e) {
+ logger.info(e.getLocalizedMessage());
+ throw new IllegalStateException("Metadata uri : " + metaUri + " is not recognized");
+ }
+ }
+
+ public static KylinConfig createInstanceFromUri(String uri) {
+ /**
+ * --hbase: 1. PROPERTIES_FILE: path to kylin.properties 2. REST_ADDR:
+ * rest service resource, format: user:password@host:port --local: 1.
+ * LOCAL_FOLDER: path to resource folder
+ */
+ UriType uriType = decideUriType(uri);
+ logger.info("The URI " + uri + " is recognized as " + uriType);
+
+ if (uriType == UriType.LOCAL_FOLDER) {
+ KylinConfig config = new KylinConfig();
+ config.setMetadataUrl(uri);
+ return config;
+ } else if (uriType == UriType.PROPERTIES_FILE) {
+ KylinConfig config;
+ try {
+ config = new KylinConfig();
+ InputStream is = new FileInputStream(uri);
+ config.reloadKylinConfig(is);
+ is.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return config;
+ } else {// rest_addr
+ try {
+ KylinConfig config = new KylinConfig();
+ RestClient client = new RestClient(uri);
+ String propertyText = client.getKylinProperties();
+ InputStream is = IOUtils.toInputStream(propertyText);
+ config.reloadKylinConfig(is);
+ is.close();
+ return config;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static KylinConfig getKylinConfigFromInputStream(InputStream is) {
+ KylinConfig config = new KylinConfig();
+ config.reloadKylinConfig(is);
+ return config;
+ }
+
+ // ============================================================================
+
+ /**
+ * Find config from environment. The Search process: 1. Check the
+ * $KYLIN_CONF/kylin.properties 2. Check the /etc/kylin/kylin.properties 3.
+ * Check the kylin.properties in classpath
+ *
+ * @return
+ */
+ private static KylinConfig loadKylinConfig() {
+ InputStream is = getKylinPropertiesAsInputSteam();
+ if (is == null) {
+ throw new IllegalArgumentException("Failed to load kylin config");
+ }
+ KylinConfig config = new KylinConfig();
+ config.reloadKylinConfig(is);
+ return config;
+ }
+
+ private PropertiesConfiguration kylinConfig = new PropertiesConfiguration();
+
+ private String metadataUrl;
+ private String storageUrl;
+
+ public CliCommandExecutor getCliCommandExecutor() throws IOException {
+ CliCommandExecutor exec = new CliCommandExecutor();
+ if (getRunAsRemoteCommand()) {
+ exec.setRunAtRemote(getRemoteHadoopCliHostname(), getRemoteHadoopCliPort(), getRemoteHadoopCliUsername(), getRemoteHadoopCliPassword());
+ }
+ return exec;
+ }
+
+ // ============================================================================
+
+ public String getStorageUrl() {
+ return storageUrl;
+ }
+
+ public String getHiveUrl() {
+ return getOptional(HIVE_URL, "");
+ }
+
+ public String getHiveUser() {
+ return getOptional(HIVE_USER, "");
+ }
+
+ public String getHivePassword() {
+ return getOptional(HIVE_PASSWORD, "");
+ }
+
+ public String getHdfsWorkingDirectory() {
+ String root = getRequired(KYLIN_HDFS_WORKING_DIR);
+ if (!root.endsWith("/")) {
+ root += "/";
+ }
+ return root + getMetadataUrlPrefix();
+ }
+
+ public String getKylinJobLogDir() {
+ return getOptional(KYLIN_JOB_LOG_DIR, "/tmp/kylin/logs");
+ }
+
+ public String getKylinJobJarPath() {
+ final String jobJar = getOptional(KYLIN_JOB_JAR);
+ if (StringUtils.isNotEmpty(jobJar)) {
+ return jobJar;
+ }
+ String kylinHome = getKylinHome();
+ if (StringUtils.isEmpty(kylinHome)) {
+ return "";
+ }
+ return getFileName(kylinHome + File.separator + "lib", JOB_JAR_NAME_PATTERN);
+ }
+
+ public void overrideKylinJobJarPath(String path) {
+ logger.info("override " + KYLIN_JOB_JAR + " to " + path);
+ System.setProperty(KYLIN_JOB_JAR, path);
+ }
+
+ private static final Pattern COPROCESSOR_JAR_NAME_PATTERN = Pattern.compile("kylin-coprocessor-(.+)\\.jar");
+ private static final Pattern JOB_JAR_NAME_PATTERN = Pattern.compile("kylin-job-(.+)\\.jar");
+
+ public String getCoprocessorLocalJar() {
+ final String coprocessorJar = getOptional(COPROCESSOR_LOCAL_JAR);
+ if (StringUtils.isNotEmpty(coprocessorJar)) {
+ return coprocessorJar;
+ }
+ String kylinHome = getKylinHome();
+ if (StringUtils.isEmpty(kylinHome)) {
+ throw new RuntimeException("getCoprocessorLocalJar needs KYLIN_HOME");
+ }
+ return getFileName(kylinHome + File.separator + "lib", COPROCESSOR_JAR_NAME_PATTERN);
+ }
+
+ private static String getFileName(String homePath, Pattern pattern) {
+ File home = new File(homePath);
+ SortedSet<String> files = Sets.newTreeSet();
+ if (home.exists() && home.isDirectory()) {
+ for (File file : home.listFiles()) {
+ final Matcher matcher = pattern.matcher(file.getName());
+ if (matcher.matches()) {
+ files.add(file.getAbsolutePath());
+ }
+ }
+ }
+ if (files.isEmpty()) {
+ throw new RuntimeException("cannot find " + pattern.toString() + " in " + homePath);
+ } else {
+ return files.last();
+ }
+ }
+
+ public void overrideCoprocessorLocalJar(String path) {
+ logger.info("override " + COPROCESSOR_LOCAL_JAR + " to " + path);
+ System.setProperty(COPROCESSOR_LOCAL_JAR, path);
+ }
+
+ public double getDefaultHadoopJobReducerInputMB() {
+ return Double.parseDouble(getOptional(KYLIN_JOB_MAPREDUCE_DEFAULT_REDUCE_INPUT_MB, "500"));
+ }
+
+ public double getDefaultHadoopJobReducerCountRatio() {
+ return Double.parseDouble(getOptional(KYLIN_JOB_MAPREDUCE_DEFAULT_REDUCE_COUNT_RATIO, "1.0"));
+ }
+
+ public int getHadoopJobMaxReducerNumber() {
+ return Integer.parseInt(getOptional(KYLIN_JOB_MAPREDUCE_MAX_REDUCER_NUMBER, "5000"));
+ }
+
+ public boolean getRunAsRemoteCommand() {
+ return Boolean.parseBoolean(getOptional(KYLIN_JOB_RUN_AS_REMOTE_CMD));
+ }
+
+ public int getRemoteHadoopCliPort() {
+ return Integer.parseInt(getOptional(KYLIN_JOB_REMOTE_CLI_PORT, "22"));
+ }
+
+ public String getRemoteHadoopCliHostname() {
+ return getOptional(KYLIN_JOB_REMOTE_CLI_HOSTNAME);
+ }
+
+ public String getRemoteHadoopCliUsername() {
+ return getOptional(KYLIN_JOB_REMOTE_CLI_USERNAME);
+ }
+
+ public String getRemoteHadoopCliPassword() {
+ return getOptional(KYLIN_JOB_REMOTE_CLI_PASSWORD);
+ }
+
+ public String getCliWorkingDir() {
+ return getOptional(KYLIN_JOB_REMOTE_CLI_WORKING_DIR);
+ }
+
+ public String getMapReduceCmdExtraArgs() {
+ return getOptional(KYLIN_JOB_CMD_EXTRA_ARGS);
+ }
+
+ public String getOverrideHiveTableLocation(String table) {
+ return getOptional(HIVE_TABLE_LOCATION_PREFIX + table.toUpperCase());
+ }
+
+ public String getTempHDFSDir() {
+ return getOptional(KYLIN_TMP_HDFS_DIR, "/tmp/kylin");
+ }
+
+ public String getYarnStatusCheckUrl() {
+ return getOptional(KYLIN_JOB_YARN_APP_REST_CHECK_URL, null);
+ }
+
+ public int getYarnStatusCheckIntervalSeconds() {
+ return Integer.parseInt(getOptional(KYLIN_JOB_YARN_APP_REST_CHECK_INTERVAL_SECONDS, "60"));
+ }
+
+ public int getMaxConcurrentJobLimit() {
+ return Integer.parseInt(getOptional(KYLIN_JOB_CONCURRENT_MAX_LIMIT, "10"));
+ }
+
+ public String getTimeZone() {
+ return getOptional(KYLIN_REST_TIMEZONE, "PST");
+ }
+
+ public String[] getRestServers() {
+ return getOptionalStringArray(KYLIN_REST_SERVERS);
+ }
+
+ public String getAdminDls() {
+ return getOptional("kylin.job.admin.dls", null);
+ }
+
+ public long getJobStepTimeout() {
+ return Long.parseLong(getOptional("kylin.job.step.timeout", String.valueOf(2 * 60 * 60)));
+ }
+
+ public String getServerMode() {
+ return this.getOptional("kylin.server.mode", "all");
+ }
+
+ public int getDictionaryMaxCardinality() {
+ return Integer.parseInt(getOptional("kylin.dictionary.max.cardinality", "5000000"));
+ }
+
+ public int getTableSnapshotMaxMB() {
+ return Integer.parseInt(getOptional("kylin.table.snapshot.max_mb", "300"));
+ }
+
+ public int getHBaseRegionCutMin() {
+ return Integer.parseInt(getOptional("kylin.job.hbase.region.cut.min", "2"));
+ }
+
+ public int getHBaseRegionCutMax() {
+ return Integer.parseInt(getOptional("kylin.job.hbase.region.cut.max", "1000"));
+ }
+
+ public int getScanThreshold() {
+ return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000"));
+ }
+
+ public boolean getQueryRunLocalCoprocessor() {
+ return Boolean.parseBoolean(getOptional("kylin.query.run.local.coprocessor", "false"));
+ }
+
+ public Long getQueryDurationCacheThreshold() {
+ return Long.parseLong(this.getOptional("kylin.query.cache.threshold.duration", String.valueOf(2000)));
+ }
+
+ public Long getQueryScanCountCacheThreshold() {
+ return Long.parseLong(this.getOptional("kylin.query.cache.threshold.scancount", String.valueOf(10 * 1024)));
+ }
+
+ public boolean isQuerySecureEnabled() {
+ return Boolean.parseBoolean(this.getOptional("kylin.query.security.enabled", "false"));
+ }
+
+ public boolean isQueryCacheEnabled() {
+ return Boolean.parseBoolean(this.getOptional("kylin.query.cache.enabled", "true"));
+ }
+
+ public int getHBaseKeyValueSize() {
+ return Integer.parseInt(this.getOptional("kylin.hbase.client.keyvalue.maxsize", "10485760"));
+ }
+
+ public boolean isCubingInMem() {
+ return Boolean.parseBoolean(this.getOptional(KYLIN_JOB_CUBING_IN_MEM, "false"));
+ }
+
+ public int getCubingInMemSamplingPercent() {
+ int percent = Integer.parseInt(this.getOptional(KYLIN_JOB_CUBING_IN_MEM_SAMPLING_PERCENT, "5"));
+ percent = Math.max(percent, 1);
+ percent = Math.min(percent, 100);
+ return percent;
+ }
+
+ private String getOptional(String prop) {
+ final String property = System.getProperty(prop);
+ return property != null ? property : kylinConfig.getString(prop);
+ }
+
+ private String[] getOptionalStringArray(String prop) {
+ final String property = System.getProperty(prop);
+ if (!StringUtils.isBlank(property))
+ return property.split("\\s*,\\s*");
+
+ return kylinConfig.getStringArray(prop);
+ }
+
+ private String getOptional(String prop, String dft) {
+ final String property = System.getProperty(prop);
+ return property != null ? property : kylinConfig.getString(prop, dft);
+ }
+
+ private String getRequired(String prop) {
+ final String property = System.getProperty(prop);
+ if (property != null) {
+ return property;
+ }
+ String r = kylinConfig.getString(prop);
+ if (StringUtils.isEmpty(r)) {
+ throw new IllegalArgumentException("missing '" + prop + "' in conf/kylin_instance.properties");
+ }
+ return r;
+ }
+
+ void reloadKylinConfig(InputStream is) {
+ PropertiesConfiguration config = new PropertiesConfiguration();
+ try {
+ config.load(is);
+ } catch (ConfigurationException e) {
+ throw new RuntimeException("Cannot load kylin config.", e);
+ } finally {
+ try {
+ is.close();
+ } catch (IOException e) {
+ logger.error("Failed to close inputstream.", e);
+ }
+ }
+ this.kylinConfig = config;
+ this.metadataUrl = getOptional(KYLIN_METADATA_URL);
+ this.storageUrl = getOptional(KYLIN_STORAGE_URL);
+ }
+
+ public void writeProperties(File file) throws IOException {
+ try {
+ kylinConfig.save(file);
+ } catch (ConfigurationException ex) {
+ throw new IOException("Error writing KylinConfig to " + file, ex);
+ }
+ }
+
+ public static String getKylinHome() {
+ String kylinHome = System.getenv(KYLIN_HOME);
+ if (StringUtils.isEmpty(kylinHome)) {
+ logger.warn("KYLIN_HOME was not set");
+ return kylinHome;
+ }
+ return kylinHome;
+ }
+
+ public void printProperties() throws IOException {
+ try {
+ kylinConfig.save(System.out);
+ } catch (ConfigurationException ex) {
+ throw new IOException("Error printing KylinConfig", ex);
+ }
+ }
+
+ private static File getKylinProperties() {
+ String kylinConfHome = System.getProperty(KYLIN_CONF);
+ if (!StringUtils.isEmpty(kylinConfHome)) {
+ logger.info("Use KYLIN_CONF=" + kylinConfHome);
+ return getKylinPropertiesFile(kylinConfHome);
+ }
+
+ logger.warn("KYLIN_CONF property was not set, will seek KYLIN_HOME env variable");
+
+ String kylinHome = getKylinHome();
+ if (StringUtils.isEmpty(kylinHome))
+ throw new RuntimeException("Didn't find KYLIN_CONF or KYLIN_HOME, please set one of them");
+
+ String path = kylinHome + File.separator + "conf";
+ return getKylinPropertiesFile(path);
+
+ }
+
+ public static InputStream getKylinPropertiesAsInputSteam() {
+ File propFile = getKylinProperties();
+ if (propFile == null || !propFile.exists()) {
+ logger.error("fail to locate kylin.properties");
+ throw new RuntimeException("fail to locate kylin.properties");
+ }
+
+ File overrideFile = new File(propFile.getParentFile(), propFile.getName() + ".override");
+ if (overrideFile.exists()) {
+ try {
+ PropertiesConfiguration conf = new PropertiesConfiguration();
+ conf.load(propFile);
+ PropertiesConfiguration override = new PropertiesConfiguration();
+ override.load(overrideFile);
+ conf.copy(override);
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ conf.save(bout);
+ return new ByteArrayInputStream(bout.toByteArray());
+ } catch (ConfigurationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ try {
+ return new FileInputStream(propFile);
+ } catch (FileNotFoundException e) {
+ logger.error("this should not happen");
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ /**
+ * Check if there is kylin.properties exist
+ *
+ * @param path
+ * @return the properties file
+ */
+ private static File getKylinPropertiesFile(String path) {
+ if (path == null) {
+ return null;
+ }
+
+ return new File(path, KYLIN_CONF_PROPERTIES_FILE);
+ }
+
+ public String getMetadataUrl() {
+ return metadataUrl;
+ }
+
+ public String getMetadataUrlPrefix() {
+ String hbaseMetadataUrl = getMetadataUrl();
+ String defaultPrefix = "kylin_metadata";
+
+ if (org.apache.commons.lang3.StringUtils.containsIgnoreCase(hbaseMetadataUrl, "@hbase")) {
+ int cut = hbaseMetadataUrl.indexOf('@');
+ String tmp = cut < 0 ? defaultPrefix : hbaseMetadataUrl.substring(0, cut);
+ return tmp;
+ } else {
+ return defaultPrefix;
+ }
+ }
+
+ public void setMetadataUrl(String metadataUrl) {
+ kylinConfig.setProperty(KYLIN_METADATA_URL, metadataUrl);
+ this.metadataUrl = metadataUrl;
+ }
+
+ public void setStorageUrl(String storageUrl) {
+ kylinConfig.setProperty(KYLIN_STORAGE_URL, storageUrl);
+ this.storageUrl = storageUrl;
+ }
+
+ public void setRunAsRemoteCommand(String v) {
+ kylinConfig.setProperty(KYLIN_JOB_RUN_AS_REMOTE_CMD, v);
+ }
+
+ public void setRemoteHadoopCliHostname(String v) {
+ kylinConfig.setProperty(KYLIN_JOB_REMOTE_CLI_HOSTNAME, v);
+ }
+
+ public void setRemoteHadoopCliUsername(String v) {
+ kylinConfig.setProperty(KYLIN_JOB_REMOTE_CLI_USERNAME, v);
+ }
+
+ public void setRemoteHadoopCliPassword(String v) {
+ kylinConfig.setProperty(KYLIN_JOB_REMOTE_CLI_PASSWORD, v);
+ }
+
+ public String getProperty(String key, String defaultValue) {
+ return kylinConfig.getString(key, defaultValue);
+ }
+
+ /**
+ * Set a new key:value into the kylin config.
+ *
+ * @param key
+ * @param value
+ */
+ public void setProperty(String key, String value) {
+ logger.info("Kylin Config was updated with " + key + " : " + value);
+ kylinConfig.setProperty(key, value);
+ }
+
+ public String getConfigAsString() throws IOException {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ kylinConfig.save(baos);
+ String content = baos.toString();
+ return content;
+ } catch (ConfigurationException ex) {
+ throw new IOException("Error writing KylinConfig to String", ex);
+ }
+ }
+
+ public String getSparkHome() {
+ return kylinConfig.getString(SPARK_HOME);
+ }
+ public String getSparkMaster() {
+ return kylinConfig.getString(SPARK_MASTER);
+ }
+
+ public int getHBaseRegionCut(String capacity) {
+ String cut;
+ switch (capacity) {
+ case "SMALL":
+ cut = getProperty(HBASE_REGION_CUT_SMALL, "5");
+ break;
+ case "MEDIUM":
+ cut = getProperty(HBASE_REGION_CUT_MEDIUM, "10");
+ break;
+ case "LARGE":
+ cut = getProperty(HBASE_REGION_CUT_LARGE, "50");
+ break;
+ default:
+ throw new IllegalArgumentException("Capacity not recognized: " + capacity);
+ }
+
+ return Integer.valueOf(cut);
+ }
+
+ public String toString() {
+ return getMetadataUrl();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java b/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java
new file mode 100644
index 0000000..186fd42
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java
@@ -0,0 +1,10 @@
+package org.apache.kylin.common.cache;
+
+import org.apache.kylin.common.restclient.AbstractRestCache;
+import org.apache.kylin.common.restclient.Broadcaster;
+
+/**
+ */
+public interface CacheUpdater {
+ void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache);
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java b/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java
new file mode 100644
index 0000000..8f629e3
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java
@@ -0,0 +1,17 @@
+package org.apache.kylin.common.cache;
+
+import org.apache.kylin.common.restclient.AbstractRestCache;
+import org.apache.kylin.common.restclient.Broadcaster;
+
+/**
+ */
+public class LocalCacheUpdater implements CacheUpdater {
+ @Override
+ public void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache) {
+ if (syncAction == Broadcaster.EVENT.CREATE || syncAction == Broadcaster.EVENT.UPDATE) {
+ cache.putLocal(key, value);
+ } else if (syncAction == Broadcaster.EVENT.DROP) {
+ cache.removeLocal(key);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java b/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java
new file mode 100644
index 0000000..2a5e59a
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java
@@ -0,0 +1,13 @@
+package org.apache.kylin.common.cache;
+
+import org.apache.kylin.common.restclient.AbstractRestCache;
+import org.apache.kylin.common.restclient.Broadcaster;
+
+/**
+ */
+public class RemoteCacheUpdater implements CacheUpdater {
+ @Override
+ public void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache) {
+ Broadcaster.getInstance().queue(type.getType(), syncAction.getType(), key.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
new file mode 100644
index 0000000..560a836
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.debug;
+
+import java.util.Map;
+
+/**
+ */
+public class BackdoorToggles {
+
+ private static final ThreadLocal<Map<String, String>> _backdoorToggles = new ThreadLocal<Map<String, String>>();
+
+ public static void setToggles(Map<String, String> toggles) {
+ _backdoorToggles.set(toggles);
+ }
+
+ public static String getObserverBehavior() {
+ return getString(DEBUG_TOGGLE_OBSERVER_BEHAVIOR);
+ }
+
+ public static boolean getDisableFuzzyKey() {
+ return getBoolean(DEBUG_TOGGLE_DISABLE_FUZZY_KEY);
+ }
+
+ public static boolean getRunLocalCoprocessor() {
+ return getBoolean(DEBUG_TOGGLE_LOCAL_COPROCESSOR);
+ }
+
+ private static String getString(String key) {
+ Map<String, String> toggles = _backdoorToggles.get();
+ if (toggles == null) {
+ return null;
+ } else {
+ return toggles.get(key);
+ }
+ }
+
+ private static boolean getBoolean(String key) {
+ return "true".equals(getString(key));
+ }
+
+ public static void cleanToggles() {
+ _backdoorToggles.remove();
+ }
+
+ /**
+ * set DEBUG_TOGGLE_DISABLE_FUZZY_KEY=true to disable fuzzy key for debug/profile usage
+ *
+ *
+ *
+ example:(put it into request body)
+ "backdoorToggles": {
+ "DEBUG_TOGGLE_DISABLE_FUZZY_KEY": "true"
+ }
+ */
+ public final static String DEBUG_TOGGLE_DISABLE_FUZZY_KEY = "DEBUG_TOGGLE_DISABLE_FUZZY_KEY";
+
+ /**
+ * set DEBUG_TOGGLE_OBSERVER_BEHAVIOR=SCAN/SCAN_FILTER/SCAN_FILTER_AGGR to control observer behavior for debug/profile usage
+ *
+ example:(put it into request body)
+ "backdoorToggles": {
+ "DEBUG_TOGGLE_OBSERVER_BEHAVIOR": "SCAN"
+ }
+ */
+ public final static String DEBUG_TOGGLE_OBSERVER_BEHAVIOR = "DEBUG_TOGGLE_OBSERVER_BEHAVIOR";
+
+ /**
+ * set DEBUG_TOGGLE_LOCAL_COPROCESSOR=true to run coprocessor at client side (not in HBase region server)
+ *
+ example:(put it into request body)
+ "backdoorToggles": {
+ "DEBUG_TOGGLE_LOCAL_COPROCESSOR": "true"
+ }
+ */
+ public final static String DEBUG_TOGGLE_LOCAL_COPROCESSOR = "DEBUG_TOGGLE_LOCAL_COPROCESSOR";
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java b/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
new file mode 100644
index 0000000..50babd9
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.hll;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+import org.apache.kylin.common.util.BytesUtil;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+/**
+ * About compression, test on HLLC data shows
+ *
+ * - LZF compression ratio is around 65%-80%, fast
+ * - GZIP compression ratio is around 41%-46%, very slow
+ *
+ * @author yangli9
+ */
+public class HyperLogLogPlusCounter implements Comparable<HyperLogLogPlusCounter> {
+
+ private final int p;
+ private final int m;
+ private final HashFunction hashFunc;
+ byte[] registers;
+
+ public HyperLogLogPlusCounter() {
+ this(10);
+ }
+
+ public HyperLogLogPlusCounter(int p) {
+ this(p, Hashing.murmur3_128());
+ }
+
+ public HyperLogLogPlusCounter(HyperLogLogPlusCounter another) {
+ this(another.p, another.hashFunc);
+ merge(another);
+ }
+
+ /** The larger p is, the more storage (2^p bytes), the better accuracy */
+ private HyperLogLogPlusCounter(int p, HashFunction hashFunc) {
+ this.p = p;
+ this.m = 1 << p;//(int) Math.pow(2, p);
+ this.hashFunc = hashFunc;
+ this.registers = new byte[m];
+ }
+
+ public void clear() {
+ byte zero = (byte) 0;
+ Arrays.fill(registers, zero);
+ }
+
+ public void add(int value) {
+ add(hashFunc.hashInt(value).asLong());
+ }
+
+ public void add(String value) {
+ add(hashFunc.hashString(value, Charset.defaultCharset()).asLong());
+ }
+
+ public void add(byte[] value) {
+ add(hashFunc.hashBytes(value).asLong());
+ }
+
+ public void add(byte[] value, int offset, int length) {
+ add(hashFunc.hashBytes(value, offset, length).asLong());
+ }
+
+ protected void add(long hash) {
+ int bucketMask = m - 1;
+ int bucket = (int) (hash & bucketMask);
+ int firstOnePos = Long.numberOfLeadingZeros(hash | bucketMask) + 1;
+
+ if (firstOnePos > registers[bucket])
+ registers[bucket] = (byte) firstOnePos;
+ }
+
+ public void merge(HyperLogLogPlusCounter another) {
+ assert this.p == another.p;
+ assert this.hashFunc == another.hashFunc;
+
+ for (int i = 0; i < m; i++) {
+ if (registers[i] < another.registers[i])
+ registers[i] = another.registers[i];
+ }
+ }
+
+ public long getCountEstimate() {
+ return new HLLCSnapshot(this).getCountEstimate();
+ }
+
+ public double getErrorRate() {
+ return 1.04 / Math.sqrt(m);
+ }
+
+ private int size() {
+ int size = 0;
+ for (int i = 0; i < m; i++) {
+ if (registers[i] > 0)
+ size++;
+ }
+ return size;
+ }
+
+ @Override
+ public String toString() {
+ return "" + getCountEstimate();
+ }
+
+ // ============================================================================
+
+ // a memory efficient snapshot of HLL registers which can yield count
+ // estimate later
+ public static class HLLCSnapshot {
+ byte p;
+ double registerSum;
+ int zeroBuckets;
+
+ public HLLCSnapshot(HyperLogLogPlusCounter hllc) {
+ p = (byte) hllc.p;
+ registerSum = 0;
+ zeroBuckets = 0;
+
+ byte[] registers = hllc.registers;
+ for (int i = 0; i < hllc.m; i++) {
+ if (registers[i] == 0) {
+ registerSum++;
+ zeroBuckets++;
+ } else {
+ registerSum += 1.0 / (1 << registers[i]);
+ }
+ }
+ }
+
+ public long getCountEstimate() {
+ int m = (int) Math.pow(2, p);
+ double alpha = 1 / (2 * Math.log(2) * (1 + (3 * Math.log(2) - 1) / m));
+ double alphaMM = alpha * m * m;
+ double estimate = alphaMM / registerSum;
+
+ // small cardinality adjustment
+ if (zeroBuckets >= m * 0.07) { // (reference presto's HLL impl)
+ estimate = m * Math.log(m * 1.0 / zeroBuckets);
+ } else if (HyperLogLogPlusTable.isBiasCorrection(m, estimate)) {
+ estimate = HyperLogLogPlusTable.biasCorrection(p, estimate);
+ }
+
+ return Math.round(estimate);
+ }
+ }
+
+ // ============================================================================
+
+ public void writeRegisters(final ByteBuffer out) throws IOException {
+
+ final int indexLen = getRegisterIndexSize();
+ int size = size();
+
+ // decide output scheme -- map (3*size bytes) or array (2^p bytes)
+ byte scheme;
+ if (5 + (indexLen + 1) * size < m) // 5 is max len of vint
+ scheme = 0; // map
+ else
+ scheme = 1; // array
+ out.put(scheme);
+
+ if (scheme == 0) { // map scheme
+ BytesUtil.writeVInt(size, out);
+ for (int i = 0; i < m; i++) {
+ if (registers[i] > 0) {
+ writeUnsigned(i, indexLen, out);
+ out.put(registers[i]);
+ }
+ }
+ } else if (scheme == 1) { // array scheme
+ out.put(registers);
+ } else
+ throw new IllegalStateException();
+ }
+
+ public void readRegisters(ByteBuffer in) throws IOException {
+ byte scheme = in.get();
+
+ if (scheme == 0) { // map scheme
+ clear();
+ int size = BytesUtil.readVInt(in);
+ if (size > m)
+ throw new IllegalArgumentException("register size (" + size + ") cannot be larger than m (" + m + ")");
+ int indexLen = getRegisterIndexSize();
+ for (int i = 0; i < size; i++) {
+ int key = readUnsigned(in, indexLen);
+ registers[key] = in.get();
+ }
+ } else if (scheme == 1) { // array scheme
+ in.get(registers);
+ } else
+ throw new IllegalStateException();
+ }
+
+ public int peekLength(ByteBuffer in) {
+ int mark = in.position();
+ int len;
+
+ byte scheme = in.get();
+ if (scheme == 0) { // map scheme
+ int size = BytesUtil.readVInt(in);
+ int indexLen = getRegisterIndexSize();
+ len = in.position() - mark + (indexLen + 1) * size;
+ } else {
+ len = in.position() - mark + m;
+ }
+
+ in.position(mark);
+ return len;
+ }
+
+ public int maxLength() {
+ return 1 + m;
+ }
+
+ public void writeRegistersArray(final ByteBuffer out) {
+ out.put(this.registers);
+ }
+
+ public void readRegistersArray(ByteBuffer in) {
+ in.get(registers, 0, m);
+ }
+
+ private int getRegisterIndexSize() {
+ return (p - 1) / 8 + 1; // 2 when p=16, 3 when p=17
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((hashFunc == null) ? 0 : hashFunc.hashCode());
+ result = prime * result + p;
+ result = prime * result + Arrays.hashCode(registers);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ HyperLogLogPlusCounter other = (HyperLogLogPlusCounter) obj;
+ if (hashFunc == null) {
+ if (other.hashFunc != null)
+ return false;
+ } else if (!hashFunc.equals(other.hashFunc))
+ return false;
+ if (p != other.p)
+ return false;
+ if (!Arrays.equals(registers, other.registers))
+ return false;
+ return true;
+ }
+
+ @Override
+ public int compareTo(HyperLogLogPlusCounter o) {
+ if (o == null)
+ return 1;
+
+ long e1 = this.getCountEstimate();
+ long e2 = o.getCountEstimate();
+
+ if (e1 == e2)
+ return 0;
+ else if (e1 > e2)
+ return 1;
+ else
+ return -1;
+ }
+
+ public static void main(String[] args) throws IOException {
+ dumpErrorRates();
+ }
+
+ static void dumpErrorRates() {
+ for (int p = 10; p <= 18; p++) {
+ double rate = new HyperLogLogPlusCounter(p).getErrorRate();
+ double er = Math.round(rate * 10000) / 100D;
+ double er2 = Math.round(rate * 2 * 10000) / 100D;
+ double er3 = Math.round(rate * 3 * 10000) / 100D;
+ long size = Math.round(Math.pow(2, p));
+ System.out.println("HLLC" + p + ",\t" + size + " bytes,\t68% err<" + er + "%" + ",\t95% err<" + er2 + "%" + ",\t99.7% err<" + er3 + "%");
+ }
+ }
+
+ /**
+ *
+ * @param num
+ * @param size
+ * @param out
+ */
+ public static void writeUnsigned(int num, int size, ByteBuffer out) {
+ for (int i = 0; i < size; i++) {
+ out.put((byte) num);
+ num >>>= 8;
+ }
+ }
+
+ public static int readUnsigned(ByteBuffer in, int size) {
+ int integer = 0;
+ int mask = 0xff;
+ int shift = 0;
+ for (int i = 0; i < size; i++) {
+ integer |= (in.get() << shift) & mask;
+ mask = mask << 8;
+ shift += 8;
+ }
+ return integer;
+ }
+}