You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/08/24 09:36:18 UTC
[kylin] 01/02: KYLIN-2565, upgrade to Hadoop3.0
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master-hadoop3.1-2.5.0
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 9f0227e642abba0f366b97ddafbec9180fb5f7fb
Author: Cheng Wang <ch...@kyligence.io>
AuthorDate: Tue Apr 25 18:45:57 2017 +0800
KYLIN-2565, upgrade to Hadoop3.0
---
.../mr/common/DefaultSslProtocolSocketFactory.java | 150 ------
.../kylin/engine/mr/common/HadoopStatusGetter.java | 280 ++++++++++
.../apache/kylin/engine/spark/SparkCountDemo.java | 80 +++
.../org/apache/kylin/engine/spark/SparkCubing.java | 591 +++++++++++++++++++++
pom.xml | 19 +-
server-base/pom.xml | 5 +
.../org/apache/kylin/rest/security/MockHTable.java | 112 ++--
.../kylin/storage/hbase/HBaseConnection.java | 5 +
.../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 15 +-
.../v2/coprocessor/endpoint/CubeVisitService.java | 4 +-
.../kylin/storage/hbase/steps/CubeHFileJob.java | 12 +
.../storage/hbase/steps/HBaseCuboidWriter.java | 133 +++++
.../kylin/storage/hbase/util/CubeMigrationCLI.java | 2 +-
.../storage/hbase/util/DeployCoprocessorCLI.java | 3 +-
.../storage/hbase/util/ExtendCubeToHybridCLI.java | 2 +-
.../hbase/util/GridTableHBaseBenchmark.java | 2 +-
.../kylin/storage/hbase/util/PingHBaseCLI.java | 3 +-
.../storage/hbase/steps/CubeHFileMapperTest.java | 22 +-
.../kylin/storage/hbase/steps/TestHbaseClient.java | 14 +-
.../org/apache/kylin/tool/CubeMigrationCLI.java | 14 +-
.../apache/kylin/tool/CubeMigrationCheckCLI.java | 17 +-
.../apache/kylin/tool/ExtendCubeToHybridCLI.java | 2 +-
.../org/apache/kylin/tool/StorageCleanupJob.java | 1 +
23 files changed, 1240 insertions(+), 248 deletions(-)
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java
deleted file mode 100644
index d66e4eb..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.common;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.net.UnknownHostException;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
-
-import org.apache.commons.httpclient.ConnectTimeoutException;
-import org.apache.commons.httpclient.HttpClientError;
-import org.apache.commons.httpclient.params.HttpConnectionParams;
-import org.apache.commons.httpclient.protocol.ControllerThreadSocketFactory;
-import org.apache.commons.httpclient.protocol.SecureProtocolSocketFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xduo
- *
- */
-public class DefaultSslProtocolSocketFactory implements SecureProtocolSocketFactory {
- /** Log object for this class. */
- private static Logger logger = LoggerFactory.getLogger(DefaultSslProtocolSocketFactory.class);
- private SSLContext sslcontext = null;
-
- /**
- * Constructor for DefaultSslProtocolSocketFactory.
- */
- public DefaultSslProtocolSocketFactory() {
- super();
- }
-
- /**
- * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int,java.net.InetAddress,int)
- */
- public Socket createSocket(String host, int port, InetAddress clientHost, int clientPort) throws IOException, UnknownHostException {
- return getSSLContext().getSocketFactory().createSocket(host, port, clientHost, clientPort);
- }
-
- /**
- * Attempts to get a new socket connection to the given host within the
- * given time limit.
- *
- * <p>
- * To circumvent the limitations of older JREs that do not support connect
- * timeout a controller thread is executed. The controller thread attempts
- * to create a new socket within the given limit of time. If socket
- * constructor does not return until the timeout expires, the controller
- * terminates and throws an {@link ConnectTimeoutException}
- * </p>
- *
- * @param host
- * the host name/IP
- * @param port
- * the port on the host
- * @param localAddress
- * the local host name/IP to bind the socket to
- * @param localPort
- * the port on the local machine
- * @param params
- * {@link HttpConnectionParams Http connection parameters}
- *
- * @return Socket a new socket
- *
- * @throws IOException
- * if an I/O error occurs while creating the socket
- * @throws UnknownHostException
- * if the IP address of the host cannot be determined
- * @throws ConnectTimeoutException
- * DOCUMENT ME!
- * @throws IllegalArgumentException
- * DOCUMENT ME!
- */
- public Socket createSocket(final String host, final int port, final InetAddress localAddress, final int localPort, final HttpConnectionParams params) throws IOException, UnknownHostException, ConnectTimeoutException {
- if (params == null) {
- throw new IllegalArgumentException("Parameters may not be null");
- }
-
- int timeout = params.getConnectionTimeout();
-
- if (timeout == 0) {
- return createSocket(host, port, localAddress, localPort);
- } else {
- // To be eventually deprecated when migrated to Java 1.4 or above
- return ControllerThreadSocketFactory.createSocket(this, host, port, localAddress, localPort, timeout);
- }
- }
-
- /**
- * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int)
- */
- public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
- return getSSLContext().getSocketFactory().createSocket(host, port);
- }
-
- /**
- * @see SecureProtocolSocketFactory#createSocket(java.net.Socket,java.lang.String,int,boolean)
- */
- public Socket createSocket(Socket socket, String host, int port, boolean autoClose) throws IOException, UnknownHostException {
- return getSSLContext().getSocketFactory().createSocket(socket, host, port, autoClose);
- }
-
- public boolean equals(Object obj) {
- return ((obj != null) && obj.getClass().equals(DefaultX509TrustManager.class));
- }
-
- public int hashCode() {
- return DefaultX509TrustManager.class.hashCode();
- }
-
- private static SSLContext createEasySSLContext() {
- try {
- SSLContext context = SSLContext.getInstance("TLS");
- context.init(null, new TrustManager[] { new DefaultX509TrustManager(null) }, null);
-
- return context;
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- throw new HttpClientError(e.toString());
- }
- }
-
- private SSLContext getSSLContext() {
- if (this.sslcontext == null) {
- this.sslcontext = createEasySSLContext();
- }
-
- return this.sslcontext;
- }
-}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
new file mode 100644
index 0000000..0245c1c
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.nio.charset.Charset;
+import java.security.KeyManagementException;
+import java.security.Principal;
+import java.security.SecureRandom;
+import java.security.cert.X509Certificate;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.http.Header;
+import org.apache.http.HttpResponse;
+import org.apache.http.auth.AuthSchemeRegistry;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.Credentials;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.params.AuthPolicy;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.http.conn.scheme.Scheme;
+import org.apache.http.conn.scheme.SchemeRegistry;
+import org.apache.http.conn.ssl.SSLSocketFactory;
+import org.apache.http.impl.auth.SPNegoSchemeFactory;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ */
+public class HadoopStatusGetter {
+
+ private final String mrJobId;
+ private final String yarnUrl;
+
+ protected static final Logger logger = LoggerFactory.getLogger(HadoopStatusGetter.class);
+
+ public HadoopStatusGetter(String yarnUrl, String mrJobId) {
+ this.yarnUrl = yarnUrl;
+ this.mrJobId = mrJobId;
+ }
+
+ public Pair<RMAppState, FinalApplicationStatus> get(boolean useKerberosAuth) throws IOException {
+ String applicationId = mrJobId.replace("job", "application");
+ String url = yarnUrl.replace("${job_id}", applicationId);
+ String response = useKerberosAuth ? getHttpResponseWithKerberosAuth(url) : getHttpResponse(url);
+ logger.debug("Hadoop job " + mrJobId + " status : " + response);
+ JsonNode root = new ObjectMapper().readTree(response);
+ RMAppState state = RMAppState.valueOf(root.findValue("state").textValue());
+ FinalApplicationStatus finalStatus = FinalApplicationStatus.valueOf(root.findValue("finalStatus").textValue());
+ return Pair.of(state, finalStatus);
+ }
+
+ private static String DEFAULT_KRB5_CONFIG_LOCATION = "/etc/krb5.conf";
+
+ private String getHttpResponseWithKerberosAuth(String url) throws IOException {
+ String krb5ConfigPath = System.getProperty("java.security.krb5.conf");
+ if (krb5ConfigPath == null) {
+ krb5ConfigPath = DEFAULT_KRB5_CONFIG_LOCATION;
+ }
+ boolean skipPortAtKerberosDatabaseLookup = true;
+ System.setProperty("java.security.krb5.conf", krb5ConfigPath);
+ System.setProperty("sun.security.krb5.debug", "true");
+ System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
+
+ DefaultHttpClient client = new DefaultHttpClient();
+ AuthSchemeRegistry authSchemeRegistry = new AuthSchemeRegistry();
+ authSchemeRegistry.register(AuthPolicy.SPNEGO, new SPNegoSchemeFactory(skipPortAtKerberosDatabaseLookup));
+ client.setAuthSchemes(authSchemeRegistry);
+
+ BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ Credentials useJaasCreds = new Credentials() {
+ public String getPassword() {
+ return null;
+ }
+
+ public Principal getUserPrincipal() {
+ return null;
+ }
+ };
+ credentialsProvider.setCredentials(new AuthScope(null, -1, null), useJaasCreds);
+ client.setCredentialsProvider(credentialsProvider);
+
+ String response = null;
+ while (response == null) {
+ if (url.startsWith("https://")) {
+ registerEasyHttps(client);
+ }
+ if (url.contains("anonymous=true") == false) {
+ url += url.contains("?") ? "&" : "?";
+ url += "anonymous=true";
+ }
+ HttpGet httpget = new HttpGet(url);
+ httpget.addHeader("accept", "application/json");
+ try {
+ HttpResponse httpResponse = client.execute(httpget);
+ String redirect = null;
+ org.apache.http.Header h = httpResponse.getFirstHeader("Location");
+ if (h != null) {
+ redirect = h.getValue();
+ if (isValidURL(redirect) == false) {
+ logger.info("Get invalid redirect url, skip it: " + redirect);
+ Thread.sleep(1000L);
+ continue;
+ }
+ } else {
+ h = httpResponse.getFirstHeader("Refresh");
+ if (h != null) {
+ String s = h.getValue();
+ int cut = s.indexOf("url=");
+ if (cut >= 0) {
+ redirect = s.substring(cut + 4);
+
+ if (isValidURL(redirect) == false) {
+ logger.info("Get invalid redirect url, skip it: " + redirect);
+ Thread.sleep(1000L);
+ continue;
+ }
+ }
+ }
+ }
+
+ if (redirect == null) {
+ response = IOUtils.toString(httpResponse.getEntity().getContent(), Charset.defaultCharset());
+ logger.debug("Job " + mrJobId + " get status check result.\n");
+ } else {
+ url = redirect;
+ logger.debug("Job " + mrJobId + " check redirect url " + url + ".\n");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error(e.getMessage());
+ } finally {
+ httpget.releaseConnection();
+ }
+ }
+
+ return response;
+ }
+
+ private String getHttpResponse(String url) throws IOException {
+ HttpClient client = new DefaultHttpClient();
+
+ String response = null;
+ while (response == null) { // follow redirects via 'refresh'
+ if (url.startsWith("https://")) {
+ registerEasyHttps(client);
+ }
+ if (url.contains("anonymous=true") == false) {
+ url += url.contains("?") ? "&" : "?";
+ url += "anonymous=true";
+ }
+
+ HttpGet get = new HttpGet(url);
+ get.addHeader("accept", "application/json");
+
+ try {
+ HttpResponse res = client.execute(get);
+
+ String redirect = null;
+ Header h = res.getFirstHeader("Location");
+ if (h != null) {
+ redirect = h.getValue();
+ if (isValidURL(redirect) == false) {
+ logger.info("Get invalid redirect url, skip it: " + redirect);
+ Thread.sleep(1000L);
+ continue;
+ }
+ } else {
+ h = res.getFirstHeader("Refresh");
+ if (h != null) {
+ String s = h.getValue();
+ int cut = s.indexOf("url=");
+ if (cut >= 0) {
+ redirect = s.substring(cut + 4);
+
+ if (isValidURL(redirect) == false) {
+ logger.info("Get invalid redirect url, skip it: " + redirect);
+ Thread.sleep(1000L);
+ continue;
+ }
+ }
+ }
+ }
+
+ if (redirect == null) {
+ response = res.getStatusLine().toString();
+ logger.debug("Job " + mrJobId + " get status check result.\n");
+ } else {
+ url = redirect;
+ logger.debug("Job " + mrJobId + " check redirect url " + url + ".\n");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error(e.getMessage());
+ } finally {
+ get.releaseConnection();
+ }
+ }
+
+ return response;
+ }
+
+ private static void registerEasyHttps(HttpClient client) {
+ SSLContext sslContext;
+ try {
+ sslContext = SSLContext.getInstance("SSL");
+
+ // set up a TrustManager that trusts everything
+ try {
+ sslContext.init(null, new TrustManager[] { new DefaultX509TrustManager(null) {
+ public X509Certificate[] getAcceptedIssuers() {
+ logger.debug("getAcceptedIssuers");
+ return null;
+ }
+
+ public void checkClientTrusted(X509Certificate[] certs, String authType) {
+ logger.debug("checkClientTrusted");
+ }
+
+ public void checkServerTrusted(X509Certificate[] certs, String authType) {
+ logger.debug("checkServerTrusted");
+ }
+ } }, new SecureRandom());
+ } catch (KeyManagementException e) {
+ }
+ SSLSocketFactory ssf = new SSLSocketFactory(sslContext, SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
+ ClientConnectionManager ccm = client.getConnectionManager();
+ SchemeRegistry sr = ccm.getSchemeRegistry();
+ sr.register(new Scheme("https", 443, ssf));
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+
+ private static boolean isValidURL(String value) {
+ if (StringUtils.isNotEmpty(value)) {
+ java.net.URL url;
+ try {
+ url = new java.net.URL(value);
+ } catch (MalformedURLException var5) {
+ return false;
+ }
+
+ return StringUtils.isNotEmpty(url.getProtocol()) && StringUtils.isNotEmpty(url.getHost());
+ }
+
+ return false;
+ }
+
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java
new file mode 100644
index 0000000..a079a57
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.spark;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+
+import scala.Tuple2;
+
+/**
+ */
+public class SparkCountDemo extends AbstractApplication {
+
+ private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input");
+
+ private Options options;
+
+ public SparkCountDemo() {
+ options = new Options();
+ // options.addOption(OPTION_INPUT_PATH);
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ String logFile = "hdfs://10.249.65.231:8020/tmp/kylin.properties"; // Should be some file on your system
+ SparkConf conf = new SparkConf().setAppName("Simple Application");
+ JavaSparkContext sc = new JavaSparkContext(conf);
+ final JavaPairRDD<String, Integer> logData = sc.textFile(logFile).mapToPair(new PairFunction<String, String, Integer>() {
+
+ @Override
+ public Tuple2<String, Integer> call(String s) throws Exception {
+ return new Tuple2<String, Integer>(s, s.length());
+ }
+ }).sortByKey();
+ logData.persist(StorageLevel.MEMORY_AND_DISK_SER());
+
+ System.out.println("line number:" + logData.count());
+
+ logData.mapToPair(new PairFunction<Tuple2<String, Integer>, ImmutableBytesWritable, KeyValue>() {
+ @Override
+ public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
+ ImmutableBytesWritable key = new ImmutableBytesWritable(stringIntegerTuple2._1().getBytes());
+ KeyValue value = new KeyValue(stringIntegerTuple2._1().getBytes(), "f".getBytes(), "c".getBytes(), String.valueOf(stringIntegerTuple2._2()).getBytes());
+ return new Tuple2(key, value);
+ }
+ }).saveAsNewAPIHadoopFile("hdfs://10.249.65.231:8020/tmp/hfile", ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class);
+
+ }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
new file mode 100644
index 0000000..a87d66b
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -0,0 +1,591 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.spark;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.cube.kv.CubeDimEncMap;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.cube.model.RowKeyDesc;
+import org.apache.kylin.cube.util.CubingUtils;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter;
+import org.apache.kylin.engine.spark.cube.DefaultTupleConverter;
+import org.apache.kylin.engine.spark.util.IteratorUtils;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.steps.CreateHTableJob;
+import org.apache.kylin.storage.hbase.steps.CubeHTableUtil;
+import org.apache.spark.Partitioner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkFiles;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.hive.HiveContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.primitives.UnsignedBytes;
+
+import scala.Tuple2;
+
+/**
+ */
+public class SparkCubing extends AbstractApplication {
+
+ protected static final Logger logger = LoggerFactory.getLogger(SparkCubing.class);
+
+ private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable");
+ private static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+ private static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId");
+ private static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath");
+ private static final Option OPTION_COPROCESSOR = OptionBuilder.withArgName("coprocessor").hasArg().isRequired(true).withDescription("Coprocessor Jar Path").create("coprocessor");
+
+ private Options options;
+
+ public SparkCubing() {
+ options = new Options();
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_CONF_PATH);
+ options.addOption(OPTION_COPROCESSOR);
+
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ public static KylinConfig loadKylinPropsAndMetadata(String folder) throws IOException {
+ File metaDir = new File(folder);
+ if (!metaDir.getAbsolutePath().equals(System.getProperty(KylinConfig.KYLIN_CONF))) {
+ System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath());
+ logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath());
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ System.out.println("setting metadataUrl to " + metaDir.getAbsolutePath());
+ kylinConfig.setMetadataUrl(metaDir.getAbsolutePath());
+ return kylinConfig;
+ } else {
+ return KylinConfig.getInstanceFromEnv();
+ }
+ }
+
+ private void setupClasspath(JavaSparkContext sc, String confPath) throws Exception {
+ ClassUtil.addClasspath(confPath);
+ final File[] files = new File(confPath).listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File pathname) {
+ if (pathname.getAbsolutePath().endsWith(".xml")) {
+ return true;
+ }
+ if (pathname.getAbsolutePath().endsWith(".properties")) {
+ return true;
+ }
+ return false;
+ }
+ });
+ if (files == null) {
+ return;
+ }
+ for (File file : files) {
+ sc.addFile(file.getAbsolutePath());
+ }
+ }
+
+ private void writeDictionary(DataFrame intermediateTable, String cubeName, String segmentId) throws Exception {
+ final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+ final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
+ final String[] columns = intermediateTable.columns();
+ final CubeSegment seg = cubeInstance.getSegmentById(segmentId);
+ final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+ final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
+ final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(seg), cubeDesc);
+ final List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc)).getColumns();
+ final long start = System.currentTimeMillis();
+ final RowKeyDesc rowKey = cubeDesc.getRowkey();
+ for (int i = 0; i < baseCuboidColumn.size(); i++) {
+ TblColRef col = baseCuboidColumn.get(i);
+ if (!rowKey.isUseDictionary(col)) {
+ continue;
+ }
+ final int rowKeyColumnIndex = flatDesc.getRowKeyColumnIndexes()[i];
+ tblColRefMap.put(rowKeyColumnIndex, col);
+ }
+
+ Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap();
+ for (Map.Entry<Integer, TblColRef> entry : tblColRefMap.entrySet()) {
+ final String column = columns[entry.getKey()];
+ final TblColRef tblColRef = entry.getValue();
+ final DataFrame frame = intermediateTable.select(column).distinct();
+
+ final Row[] rows = frame.collect();
+ dictionaryMap.put(tblColRef, DictionaryGenerator.buildDictionary(tblColRef.getType(), new IterableDictionaryValueEnumerator(new Iterable<String>() {
+ @Override
+ public Iterator<String> iterator() {
+ return new Iterator<String>() {
+ int i = 0;
+
+ @Override
+ public boolean hasNext() {
+ return i < rows.length;
+ }
+
+ @Override
+ public String next() {
+ if (hasNext()) {
+ final Row row = rows[i++];
+ final Object o = row.get(0);
+ return o != null ? o.toString() : null;
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ })));
+ }
+ final long end = System.currentTimeMillis();
+ CubingUtils.writeDictionary(seg, dictionaryMap, start, end);
+ try {
+ CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
+ cubeBuilder.setToUpdateSegs(seg);
+ cubeManager.updateCube(cubeBuilder);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to deal with the request: " + e.getLocalizedMessage());
+ }
+ }
+
+ private Map<Long, HLLCounter> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName, String segmentId) throws Exception {
+ CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+ CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+ CubeDesc cubeDesc = cubeInstance.getDescriptor();
+ CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc);
+ List<Long> allCuboidIds = cuboidScheduler.getAllCuboidIds();
+ final HashMap<Long, HLLCounter> zeroValue = Maps.newHashMap();
+ for (Long id : allCuboidIds) {
+ zeroValue.put(id, new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()));
+ }
+
+ CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
+
+ final int[] rowKeyColumnIndexes = flatDesc.getRowKeyColumnIndexes();
+ final int nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
+ final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
+ final ByteArray[] row_hashcodes = new ByteArray[nRowKey];
+
+ for (Long cuboidId : allCuboidIds) {
+ Integer[] cuboidBitSet = new Integer[Long.bitCount(cuboidId)];
+
+ long mask = Long.highestOneBit(baseCuboidId);
+ int position = 0;
+ for (int i = 0; i < nRowKey; i++) {
+ if ((mask & cuboidId) > 0) {
+ cuboidBitSet[position] = i;
+ position++;
+ }
+ mask = mask >> 1;
+ }
+ allCuboidsBitSet.put(cuboidId, cuboidBitSet);
+ }
+ for (int i = 0; i < nRowKey; ++i) {
+ row_hashcodes[i] = new ByteArray();
+ }
+
+ final HashMap<Long, HLLCounter> samplingResult = rowJavaRDD.aggregate(zeroValue, new Function2<HashMap<Long, HLLCounter>, List<String>, HashMap<Long, HLLCounter>>() {
+
+ final HashFunction hashFunction = Hashing.murmur3_128();
+
+ @Override
+ public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, List<String> v2) throws Exception {
+ for (int i = 0; i < nRowKey; i++) {
+ Hasher hc = hashFunction.newHasher();
+ String colValue = v2.get(rowKeyColumnIndexes[i]);
+ if (colValue != null) {
+ row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
+ } else {
+ row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
+ }
+ }
+
+ for (Map.Entry<Long, Integer[]> entry : allCuboidsBitSet.entrySet()) {
+ Hasher hc = hashFunction.newHasher();
+ HLLCounter counter = v1.get(entry.getKey());
+ final Integer[] cuboidBitSet = entry.getValue();
+ for (int position = 0; position < cuboidBitSet.length; position++) {
+ hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
+ }
+ counter.add(hc.hash().asBytes());
+ }
+ return v1;
+ }
+ }, new Function2<HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>>() {
+ @Override
+ public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, HashMap<Long, HLLCounter> v2) throws Exception {
+ Preconditions.checkArgument(v1.size() == v2.size());
+ Preconditions.checkArgument(v1.size() > 0);
+ for (Map.Entry<Long, HLLCounter> entry : v1.entrySet()) {
+ final HLLCounter counter1 = entry.getValue();
+ final HLLCounter counter2 = v2.get(entry.getKey());
+ counter1.merge(Preconditions.checkNotNull(counter2, "counter cannot be null"));
+ }
+ return v1;
+ }
+
+ });
+ return samplingResult;
+ }
+
+ /** return hfile location */
+ private String build(JavaRDD<List<String>> javaRDD, final String cubeName, final String segmentId, final byte[][] splitKeys) throws Exception {
+ CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+ CubeDesc cubeDesc = cubeInstance.getDescriptor();
+ final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+ List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc)).getColumns();
+ final Map<TblColRef, Integer> columnLengthMap = Maps.newHashMap();
+ final CubeDimEncMap dimEncMap = cubeSegment.getDimensionEncodingMap();
+ for (TblColRef tblColRef : baseCuboidColumn) {
+ columnLengthMap.put(tblColRef, dimEncMap.get(tblColRef).getLengthOfEncoding());
+ }
+ final Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap();
+ for (DimensionDesc dim : cubeDesc.getDimensions()) {
+ // dictionary
+ for (TblColRef col : dim.getColumnRefs()) {
+ if (cubeDesc.getRowkey().isUseDictionary(col)) {
+ Dictionary<String> dict = cubeSegment.getDictionary(col);
+ if (dict == null) {
+ System.err.println("Dictionary for " + col + " was not found.");
+ continue;
+ }
+ dictionaryMap.put(col, dict);
+ System.out.println("col:" + col + " dictionary size:" + dict.getSize());
+ }
+ }
+ }
+
+ for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+ FunctionDesc func = measureDesc.getFunction();
+ List<TblColRef> colRefs = func.getMeasureType().getColumnsNeedDictionary(func);
+ for (TblColRef col : colRefs) {
+ dictionaryMap.put(col, cubeSegment.getDictionary(col));
+ }
+ }
+
+ final JavaPairRDD<byte[], byte[]> javaPairRDD = javaRDD.glom().mapPartitionsToPair(new PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() {
+
+ @Override
+ public Iterator<Tuple2<byte[], byte[]>> call(Iterator<List<List<String>>> listIterator) throws Exception {
+ long t = System.currentTimeMillis();
+ prepare();
+
+ final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+
+ LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue();
+ System.out.println("load properties finished");
+ IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment);
+ AbstractInMemCubeBuilder inMemCubeBuilder = new DoggedCubeBuilder(cubeInstance.getDescriptor(), flatDesc, dictionaryMap);
+ final SparkCuboidWriter sparkCuboidWriter = new BufferedCuboidWriter(new DefaultTupleConverter(cubeInstance.getSegmentById(segmentId), columnLengthMap));
+ Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, sparkCuboidWriter));
+ try {
+ while (listIterator.hasNext()) {
+ for (List<String> row : listIterator.next()) {
+ blockingQueue.put(row);
+ }
+ }
+ blockingQueue.put(Collections.<String> emptyList());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ System.out.println("build partition cost: " + (System.currentTimeMillis() - t) + "ms");
+ return sparkCuboidWriter.getResult().iterator();
+ }
+ });
+
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ Configuration conf = getConfigurationForHFile(cubeSegment.getStorageLocationIdentifier());
+ Path path = new Path(kylinConfig.getHdfsWorkingDirectory(), "hfile_" + UUID.randomUUID().toString());
+ Preconditions.checkArgument(!FileSystem.get(conf).exists(path));
+ String url = conf.get("fs.defaultFS") + path.toString();
+ System.out.println("use " + url + " as hfile");
+ List<MeasureDesc> measuresDescs = cubeDesc.getMeasures();
+ final int measureSize = measuresDescs.size();
+ final String[] dataTypes = new String[measureSize];
+ for (int i = 0; i < dataTypes.length; i++) {
+ dataTypes[i] = measuresDescs.get(i).getFunction().getReturnType();
+ }
+ final MeasureAggregators aggs = new MeasureAggregators(measuresDescs);
+ writeToHFile2(javaPairRDD, dataTypes, measureSize, aggs, splitKeys, conf, url);
+ return url;
+ }
+
+ private void writeToHFile2(final JavaPairRDD<byte[], byte[]> javaPairRDD, final String[] dataTypes, final int measureSize, final MeasureAggregators aggs, final byte[][] splitKeys, final Configuration conf, final String hFileLocation) {
+ javaPairRDD.repartitionAndSortWithinPartitions(new Partitioner() {
+ @Override
+ public int numPartitions() {
+ return splitKeys.length + 1;
+ }
+
+ @Override
+ public int getPartition(Object key) {
+ Preconditions.checkArgument(key instanceof byte[]);
+ for (int i = 0, n = splitKeys.length; i < n; ++i) {
+ if (UnsignedBytes.lexicographicalComparator().compare((byte[]) key, splitKeys[i]) < 0) {
+ return i;
+ }
+ }
+ return splitKeys.length;
+ }
+ }, UnsignedBytes.lexicographicalComparator()).mapPartitions(new FlatMapFunction<Iterator<Tuple2<byte[], byte[]>>, Tuple2<byte[], byte[]>>() {
+ @Override
+ public Iterator<Tuple2<byte[], byte[]>> call(final Iterator<Tuple2<byte[], byte[]>> tuple2Iterator) throws Exception {
+ Iterable<Tuple2<byte[], byte[]>> iterable = new Iterable<Tuple2<byte[], byte[]>>() {
+ final BufferedMeasureCodec codec = new BufferedMeasureCodec(dataTypes);
+ final Object[] input = new Object[measureSize];
+ final Object[] result = new Object[measureSize];
+
+ @Override
+ public Iterator<Tuple2<byte[], byte[]>> iterator() {
+ return IteratorUtils.merge(tuple2Iterator, UnsignedBytes.lexicographicalComparator(), new Function<Iterable<byte[]>, byte[]>() {
+ @Override
+ public byte[] call(Iterable<byte[]> v1) throws Exception {
+ final LinkedList<byte[]> list = Lists.newLinkedList(v1);
+ if (list.size() == 1) {
+ return list.get(0);
+ }
+ aggs.reset();
+ for (byte[] v : list) {
+ codec.decode(ByteBuffer.wrap(v), input);
+ aggs.aggregate(input);
+ }
+ aggs.collectStates(result);
+ ByteBuffer buffer = codec.encode(result);
+ byte[] bytes = new byte[buffer.position()];
+ System.arraycopy(buffer.array(), buffer.arrayOffset(), bytes, 0, buffer.position());
+ return bytes;
+ }
+ });
+ }
+ };
+ return iterable.iterator();
+ }
+ }, true).mapToPair(new PairFunction<Tuple2<byte[], byte[]>, ImmutableBytesWritable, KeyValue>() {
+ @Override
+ public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<byte[], byte[]> tuple2) throws Exception {
+ ImmutableBytesWritable key = new ImmutableBytesWritable(tuple2._1());
+ KeyValue value = new KeyValue(tuple2._1(), "F1".getBytes(), "M".getBytes(), tuple2._2());
+ return new Tuple2(key, value);
+ }
+ }).saveAsNewAPIHadoopFile(hFileLocation, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, conf);
+ }
+
+ public static void prepare() throws Exception {
+ final File file = new File(SparkFiles.get("kylin.properties"));
+ final String confPath = file.getParentFile().getAbsolutePath();
+ System.out.println("conf directory:" + confPath);
+ System.setProperty(KylinConfig.KYLIN_CONF, confPath);
+ ClassUtil.addClasspath(confPath);
+ }
+
+ private byte[][] createHTable(String cubeName, String segmentId, Map<Long, HLLCounter> samplingResult) throws Exception {
+ final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+ final Map<Long, Long> rowCountMap = CubeStatsReader.getCuboidRowCountMapFromSampling(samplingResult, 100);
+ final Map<Long, Double> cubeSizeMap = CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSegment, rowCountMap);
+ System.out.println("cube size estimation:" + cubeSizeMap);
+ final byte[][] splitKeys = CreateHTableJob.getRegionSplitsFromCuboidStatistics(cubeSizeMap, kylinConfig, cubeSegment, null); //FIXME: passing non-null value for 'hfileSplitsOutputFolder'
+ CubeHTableUtil.createHTable(cubeSegment, splitKeys);
+ System.out.println(cubeSegment.getStorageLocationIdentifier() + " table created");
+ return splitKeys;
+ }
+
+ private Configuration getConfigurationForHFile(String hTableName) throws IOException {
+ final Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
+ Job job = Job.getInstance(conf);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(KeyValue.class);
+ Connection connection = HBaseConnection.get();
+ Table table = connection.getTable(TableName.valueOf(hTableName));
+ HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf(hTableName)));
+ return conf;
+ }
+
+ private void bulkLoadHFile(String cubeName, String segmentId, String hfileLocation) throws Exception {
+ final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+ final Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
+
+ FsShell shell = new FsShell(hbaseConf);
+ try {
+ shell.run(new String[] { "-chmod", "-R", "777", hfileLocation });
+ } catch (Exception e) {
+ logger.error("Couldnt change the file permissions ", e);
+ throw new IOException(e);
+ }
+
+ String[] newArgs = new String[2];
+ newArgs[0] = hfileLocation;
+ newArgs[1] = cubeSegment.getStorageLocationIdentifier();
+
+ int ret = ToolRunner.run(new LoadIncrementalHFiles(hbaseConf), newArgs);
+ System.out.println("incremental load result:" + ret);
+
+ cubeSegment.setStatus(SegmentStatusEnum.READY);
+ try {
+ CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
+ cubeInstance.setStatus(RealizationStatusEnum.READY);
+ cubeSegment.setStatus(SegmentStatusEnum.READY);
+ cubeBuilder.setToUpdateSegs(cubeSegment);
+ CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to deal with the request: " + e.getLocalizedMessage());
+ }
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+ SparkConf conf = new SparkConf().setAppName("Simple Application");
+ //memory conf
+ conf.set("spark.executor.memory", "6g");
+ conf.set("spark.storage.memoryFraction", "0.3");
+
+ //serialization conf
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
+ conf.set("spark.kryo.registrationRequired", "true");
+
+ JavaSparkContext sc = new JavaSparkContext(conf);
+ HiveContext sqlContext = new HiveContext(sc.sc());
+ final DataFrame intermediateTable = sqlContext.sql("select * from " + hiveTable);
+ final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+ final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+ final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH);
+ final String coprocessor = optionsHelper.getOptionValue(OPTION_COPROCESSOR);
+ final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ kylinConfig.overrideCoprocessorLocalJar(coprocessor);
+
+ setupClasspath(sc, confPath);
+ intermediateTable.cache();
+ writeDictionary(intermediateTable, cubeName, segmentId);
+ final JavaRDD<List<String>> rowJavaRDD = intermediateTable.javaRDD().map(new org.apache.spark.api.java.function.Function<Row, List<String>>() {
+ @Override
+ public List<String> call(Row v1) throws Exception {
+ ArrayList<String> result = Lists.newArrayListWithExpectedSize(v1.size());
+ for (int i = 0; i < v1.size(); i++) {
+ final Object o = v1.get(i);
+ if (o != null) {
+ result.add(o.toString());
+ } else {
+ result.add(null);
+ }
+ }
+ return result;
+
+ }
+ });
+
+ final Map<Long, HLLCounter> samplingResult = sampling(rowJavaRDD, cubeName, segmentId);
+ final byte[][] splitKeys = createHTable(cubeName, segmentId, samplingResult);
+
+ final String hfile = build(rowJavaRDD, cubeName, segmentId, splitKeys);
+ bulkLoadHFile(cubeName, segmentId, hfile);
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index d9b9efe..9812c2d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,21 +39,21 @@
<properties>
<!-- General Properties -->
- <javaVersion>1.7</javaVersion>
+ <javaVersion>1.8</javaVersion>
<maven-model.version>3.3.9</maven-model.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- Hadoop versions -->
- <hadoop2.version>2.7.1</hadoop2.version>
- <yarn.version>2.7.1</yarn.version>
+ <hadoop2.version>3.0.0-alpha2</hadoop2.version>
+ <yarn.version>3.0.0-alpha2</yarn.version>
<!-- Hive versions -->
- <hive.version>1.2.1</hive.version>
- <hive-hcatalog.version>1.2.1</hive-hcatalog.version>
+ <hive.version>2.1.0</hive.version>
+ <hive-hcatalog.version>2.1.0</hive-hcatalog.version>
<!-- HBase versions -->
- <hbase-hadoop2.version>1.1.1</hbase-hadoop2.version>
+ <hbase-hadoop2.version>2.0.0-SNAPSHOT</hbase-hadoop2.version>
<!-- Kafka versions -->
<kafka.version>1.0.0</kafka.version>
@@ -62,6 +62,7 @@
<spark.version>2.1.2</spark.version>
<kryo.version>4.0.0</kryo.version>
+ <commons-configuration.version>1.6</commons-configuration.version>
<!-- <reflections.version>0.9.10</reflections.version> -->
<!-- Calcite Version -->
@@ -565,6 +566,12 @@
<version>${yarn.version}</version>
</dependency>
+ <dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ <version>${commons-configuration.version}</version>
+ </dependency>
+
<!-- Calcite dependencies -->
<dependency>
<groupId>org.apache.calcite</groupId>
diff --git a/server-base/pom.xml b/server-base/pom.xml
index baa6433..6f2f493 100644
--- a/server-base/pom.xml
+++ b/server-base/pom.xml
@@ -194,6 +194,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<repositories>
diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java b/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java
index 9eb9bb7..fd53b5b 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java
@@ -43,6 +43,8 @@ import java.util.TreeMap;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
@@ -51,7 +53,6 @@ import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
@@ -60,6 +61,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.filter.CompareFilter;
@@ -98,7 +100,7 @@ public class MockHTable implements Table {
private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) {
+ private static List<Cell> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) {
return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions);
}
@@ -163,8 +165,8 @@ public class MockHTable implements Table {
throw new RuntimeException(this.getClass() + " does NOT implement this method.");
}
- private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) {
- List<KeyValue> ret = new ArrayList<KeyValue>();
+ private static List<Cell> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) {
+ List<Cell> ret = new ArrayList<>();
for (byte[] family : rowdata.keySet())
for (byte[] qualifier : rowdata.get(family).keySet()) {
int versionsAdded = 0;
@@ -208,7 +210,6 @@ public class MockHTable implements Table {
/**
* {@inheritDoc}
*/
- @Override
public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
Object[] results = new Object[actions.size()]; // same size.
for (int i = 0; i < actions.size(); i++) {
@@ -242,11 +243,6 @@ public class MockHTable implements Table {
}
- @Override
- public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException, InterruptedException {
- return new Object[0];
- }
-
/**
* {@inheritDoc}
*/
@@ -255,7 +251,7 @@ public class MockHTable implements Table {
if (!data.containsKey(get.getRow()))
return new Result();
byte[] row = get.getRow();
- List<KeyValue> kvs = new ArrayList<KeyValue>();
+ List<Cell> kvs = new ArrayList<>();
if (!get.hasFamilies()) {
kvs = toKeyValue(row, data.get(row), get.getMaxVersions());
} else {
@@ -280,7 +276,7 @@ public class MockHTable implements Table {
kvs = filter(filter, kvs);
}
- return new Result(kvs);
+ return Result.create(kvs);
}
/**
@@ -318,11 +314,11 @@ public class MockHTable implements Table {
break;
}
- List<KeyValue> kvs = null;
+ List<Cell> kvs = null;
if (!scan.hasFamilies()) {
kvs = toKeyValue(row, data.get(row), scan.getTimeRange().getMin(), scan.getTimeRange().getMax(), scan.getMaxVersions());
} else {
- kvs = new ArrayList<KeyValue>();
+ kvs = new ArrayList<>();
for (byte[] family : scan.getFamilyMap().keySet()) {
if (data.get(row).get(family) == null)
continue;
@@ -354,7 +350,7 @@ public class MockHTable implements Table {
}
}
if (!kvs.isEmpty()) {
- ret.add(new Result(kvs));
+ ret.add(Result.create(kvs));
}
}
@@ -389,12 +385,14 @@ public class MockHTable implements Table {
public void close() {
}
+ @Override
public boolean renewLease() {
- throw new RuntimeException(this.getClass() + " does NOT implement this method.");
+ return false;
}
+ @Override
public ScanMetrics getScanMetrics() {
- throw new RuntimeException(this.getClass() + " does NOT implement this method.");
+ return null;
}
};
}
@@ -406,10 +404,10 @@ public class MockHTable implements Table {
* @param kvs List of a row's KeyValues
* @return List of KeyValues that were not filtered.
*/
- private List<KeyValue> filter(Filter filter, List<KeyValue> kvs) throws IOException {
+ private List<Cell> filter(Filter filter, List<Cell> kvs) throws IOException {
filter.reset();
- List<KeyValue> tmp = new ArrayList<KeyValue>(kvs.size());
+ List<Cell> tmp = new ArrayList<>(kvs.size());
tmp.addAll(kvs);
/*
@@ -418,9 +416,9 @@ public class MockHTable implements Table {
* See Figure 4-2 on p. 163.
*/
boolean filteredOnRowKey = false;
- List<KeyValue> nkvs = new ArrayList<KeyValue>(tmp.size());
- for (KeyValue kv : tmp) {
- if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
+ List<Cell> nkvs = new ArrayList<>(tmp.size());
+ for (Cell kv : tmp) {
+ if (filter.filterRowKey(kv)) {
filteredOnRowKey = true;
break;
}
@@ -483,16 +481,16 @@ public class MockHTable implements Table {
public void put(Put put) throws IOException {
byte[] row = put.getRow();
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR));
- for (byte[] family : put.getFamilyMap().keySet()) {
+ for (byte[] family : put.getFamilyCellMap().keySet()) {
if (columnFamilies.contains(new String(family)) == false) {
throw new RuntimeException("Not Exists columnFamily : " + new String(family));
}
NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = forceFind(rowData, family, new TreeMap<byte[], NavigableMap<Long, byte[]>>(Bytes.BYTES_COMPARATOR));
- for (KeyValue kv : put.getFamilyMap().get(family)) {
- kv.updateLatestStamp(Bytes.toBytes(System.currentTimeMillis()));
- byte[] qualifier = kv.getQualifier();
+ for (Cell kv : put.getFamilyCellMap().get(family)) {
+ CellUtil.updateLatestStamp(kv, System.currentTimeMillis());
+ byte[] qualifier = kv.getQualifierArray();
NavigableMap<Long, byte[]> qualifierData = forceFind(familyData, qualifier, new TreeMap<Long, byte[]>());
- qualifierData.put(kv.getTimestamp(), kv.getValue());
+ qualifierData.put(kv.getTimestamp(), kv.getValueArray());
}
}
}
@@ -540,22 +538,22 @@ public class MockHTable implements Table {
byte[] row = delete.getRow();
if (data.get(row) == null)
return;
- if (delete.getFamilyMap().size() == 0) {
+ if (delete.getFamilyCellMap().size() == 0) {
data.remove(row);
return;
}
- for (byte[] family : delete.getFamilyMap().keySet()) {
+ for (byte[] family : delete.getFamilyCellMap().keySet()) {
if (data.get(row).get(family) == null)
continue;
- if (delete.getFamilyMap().get(family).isEmpty()) {
+ if (delete.getFamilyCellMap().get(family).isEmpty()) {
data.get(row).remove(family);
continue;
}
- for (KeyValue kv : delete.getFamilyMap().get(family)) {
- if (kv.isDelete()) {
- data.get(row).get(kv.getFamily()).clear();
+ for (Cell kv : delete.getFamilyCellMap().get(family)) {
+ if (CellUtil.isDelete(kv)) {
+ data.get(row).get(kv.getFamilyArray()).clear();
} else {
- data.get(row).get(kv.getFamily()).remove(kv.getQualifier());
+ data.get(row).get(kv.getFamilyArray()).remove(kv.getQualifierArray());
}
}
if (data.get(row).get(family).isEmpty()) {
@@ -675,40 +673,48 @@ public class MockHTable implements Table {
}
- public void setOperationTimeout(int operationTimeout) {
- throw new RuntimeException(this.getClass() + " does NOT implement this method.");
+ /***
+ *
+ * All values are default
+ *
+ * **/
+ @Override
+ public void setOperationTimeout(int i) {
+
}
+ @Override
public int getOperationTimeout() {
- throw new RuntimeException(this.getClass() + " does NOT implement this method.");
+ return 0;
}
- /** @deprecated */
- @Deprecated
+ @Override
public int getRpcTimeout() {
- throw new RuntimeException(this.getClass() + " does NOT implement this method.");
+ return 0;
}
- /** @deprecated */
- @Deprecated
- public void setRpcTimeout(int rpcTimeout) {
- throw new RuntimeException(this.getClass() + " does NOT implement this method.");
- }
+ @Override
+ public void setRpcTimeout(int i) {
- public int getWriteRpcTimeout() {
- throw new RuntimeException(this.getClass() + " does NOT implement this method.");
}
- public void setWriteRpcTimeout(int writeRpcTimeout) {
- throw new RuntimeException(this.getClass() + " does NOT implement this method.");
+ @Override
+ public int getReadRpcTimeout() {
+ return 0;
}
- public int getReadRpcTimeout() {
- throw new RuntimeException(this.getClass() + " does NOT implement this method.");
+ @Override
+ public void setReadRpcTimeout(int i) {
+
}
- public void setReadRpcTimeout(int readRpcTimeout) {
- throw new RuntimeException(this.getClass() + " does NOT implement this method.");
+ @Override
+ public int getWriteRpcTimeout() {
+ return 0;
}
+ @Override
+ public void setWriteRpcTimeout(int i) {
+
+ }
}
\ No newline at end of file
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
index 53e8a68..0f71797 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -239,6 +239,11 @@ public class HBaseConnection {
// ============================================================================
+ public static Connection get() {
+ String url = KylinConfig.getInstanceFromEnv().getStorageUrl();
+ return get(url);
+ }
+
// returned Connection can be shared by multiple threads and does not require close()
@SuppressWarnings("resource")
public static Connection get(StorageURL url) {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index a8f4fd8..48dce1f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -18,11 +18,8 @@
package org.apache.kylin.storage.hbase.cube.v2;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
@@ -47,8 +44,10 @@ import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
/**
* for test use only
@@ -181,7 +180,7 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
public List<Cell> next() {
List<Cell> result = allResultsIterator.next().listCells();
for (Cell cell : result) {
- scannedBytes += CellUtil.estimatedSizeOf(cell);
+ scannedBytes += CellUtil.estimatedSerializedSizeOf(cell);
}
scannedRows++;
return result;
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index fd54e2b..89fe56d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -38,9 +38,9 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfig.SetAndUnsetThreadLocalConfig;
import org.apache.kylin.common.exceptions.KylinTimeoutException;
@@ -178,7 +178,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
List<Cell> result = delegate.next();
rowCount++;
for (Cell cell : result) {
- rowBytes += CellUtil.estimatedSizeOf(cell);
+ rowBytes += CellUtil.estimatedSerializedSizeOf(cell);
}
return result;
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
index 5ffdd48..01158a7 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
@@ -27,6 +27,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
@@ -58,6 +63,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
public int run(String[] args) throws Exception {
Options options = new Options();
+ Connection connection = null;
try {
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_CUBE_NAME);
@@ -93,6 +99,10 @@ public class CubeHFileJob extends AbstractHadoopJob {
HTable htable = new HTable(configuration, getOptionValue(OPTION_HTABLE_NAME));
+ String hTableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
+ connection = ConnectionFactory.createConnection(hbaseConf);
+ Table table = connection.getTable(TableName.valueOf(hTableName));
+ RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(hTableName));
// Automatic config !
HFileOutputFormat3.configureIncrementalLoad(job, htable);
reconfigurePartitions(configuration, partitionFilePath);
@@ -113,6 +123,8 @@ public class CubeHFileJob extends AbstractHadoopJob {
} finally {
if (job != null)
cleanupTempConfFile(job.getConfiguration());
+ if (null != connection)
+ connection.close();
}
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
new file mode 100644
index 0000000..afc2b4c
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.gridtable.GTRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class HBaseCuboidWriter implements ICuboidWriter {
+
+ private static final Logger logger = LoggerFactory.getLogger(HBaseCuboidWriter.class);
+
+ private static final int BATCH_PUT_THRESHOLD = 10000;
+
+ private final List<KeyValueCreator> keyValueCreators;
+ private final int nColumns;
+ private final Table hTable;
+ private final CubeDesc cubeDesc;
+ private final CubeSegment cubeSegment;
+ private final Object[] measureValues;
+
+ private List<Put> puts = Lists.newArrayList();
+ private AbstractRowKeyEncoder rowKeyEncoder;
+ private byte[] keybuf;
+
+ public HBaseCuboidWriter(CubeSegment segment, Table hTable) {
+ this.keyValueCreators = Lists.newArrayList();
+ this.cubeSegment = segment;
+ this.cubeDesc = cubeSegment.getCubeDesc();
+ for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+ keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
+ }
+ }
+ this.nColumns = keyValueCreators.size();
+ this.hTable = hTable;
+ this.measureValues = new Object[cubeDesc.getMeasures().size()];
+ }
+
+ private byte[] copy(byte[] array, int offset, int length) {
+ byte[] result = new byte[length];
+ System.arraycopy(array, offset, result, 0, length);
+ return result;
+ }
+
+ //TODO:shardingonstreaming
+ private byte[] createKey(Long cuboidId, GTRecord record) {
+ if (rowKeyEncoder == null || rowKeyEncoder.getCuboidID() != cuboidId) {
+ rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment,
+ Cuboid.findForMandatory(cubeDesc, cuboidId));
+ keybuf = rowKeyEncoder.createBuf();
+ }
+ rowKeyEncoder.encode(record, record.getInfo().getPrimaryKey(), keybuf);
+ return keybuf;
+
+ }
+
+ @Override
+ public void write(long cuboidId, GTRecord record) throws IOException {
+ byte[] key = createKey(cuboidId, record);
+ final Cuboid cuboid = Cuboid.findForMandatory(cubeDesc, cuboidId);
+ final int nDims = cuboid.getColumns().size();
+ final ImmutableBitSet bitSet = new ImmutableBitSet(nDims, nDims + cubeDesc.getMeasures().size());
+
+ for (int i = 0; i < nColumns; i++) {
+ final Object[] values = record.getValues(bitSet, measureValues);
+ final KeyValue keyValue = keyValueCreators.get(i).create(key, 0, key.length, values);
+ final Put put = new Put(copy(key, 0, key.length));
+ byte[] family = copy(keyValue.getFamilyArray(), keyValue.getFamilyOffset(), keyValue.getFamilyLength());
+ byte[] qualifier = copy(keyValue.getQualifierArray(), keyValue.getQualifierOffset(), keyValue.getQualifierLength());
+ byte[] value = copy(keyValue.getValueArray(), keyValue.getValueOffset(), keyValue.getValueLength());
+ put.addColumn(family, qualifier, value);
+ puts.add(put);
+ }
+ if (puts.size() >= BATCH_PUT_THRESHOLD) {
+ flush();
+ }
+ }
+
+ @Override
+ public final void flush() throws IOException {
+ if (!puts.isEmpty()) {
+ long t = System.currentTimeMillis();
+ if (hTable != null) {
+ hTable.put(puts);
+ }
+ logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms");
+ puts.clear();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ flush();
+ IOUtils.closeQuietly(hTable);
+ }
+
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
index 292d9d6..b9d5599 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
@@ -459,7 +459,7 @@ public class CubeMigrationCLI {
value = Bytes.toBytes(valueString);
}
Put put = new Put(Bytes.toBytes(cubeId));
- put.add(family, column, value);
+ put.addColumn(family, column, value);
destAclHtable.put(put);
}
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index 23ec77f..46363b2 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinVersion;
import org.apache.kylin.common.util.Bytes;
@@ -501,7 +500,7 @@ public class DeployCoprocessorCLI {
Matcher keyMatcher;
Matcher valueMatcher;
- for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : tableDescriptor.getValues().entrySet()) {
+ for (Map.Entry<org.apache.hadoop.hbase.util.Bytes, org.apache.hadoop.hbase.util.Bytes> e : tableDescriptor.getValues().entrySet()) {
keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get()));
if (!keyMatcher.matches()) {
continue;
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
index 092023e..0f9466c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
@@ -249,7 +249,7 @@ public class ExtendCubeToHybridCLI {
value = Bytes.toBytes(valueString);
}
Put put = new Put(Bytes.toBytes(newCubeId));
- put.add(family, column, value);
+ put.addColumn(family, column, value);
aclHtable.put(put);
}
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
index a317110..3f034cf 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
@@ -233,7 +233,7 @@ public class GridTableHBaseBenchmark {
byte[] rowkey = Bytes.toBytes(i);
Put put = new Put(rowkey);
byte[] cell = randomBytes();
- put.add(CF, QN, cell);
+ put.addColumn(CF, QN, cell);
table.put(put);
nBytes += cell.length;
dot(i, N_ROWS);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java
index bba6745..ff038d1 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java
@@ -50,7 +50,8 @@ public class PingHBaseCLI {
if (User.isHBaseSecurityEnabled(hconf)) {
try {
System.out.println("--------------Getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName());
- TokenUtil.obtainAndCacheToken(hconf, UserGroupInformation.getCurrentUser());
+ Connection connection = HBaseConnection.get();
+ TokenUtil.obtainAndCacheToken(connection, User.create(UserGroupInformation.getCurrentUser()));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("--------------Error while getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName());
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java
index eba4a37..ff2ef91 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java
@@ -68,13 +68,23 @@ public class CubeHFileMapperTest {
Pair<RowKeyWritable, KeyValue> p2 = result.get(1);
assertEquals(key, p1.getFirst());
- assertEquals("cf1", new String(p1.getSecond().getFamily()));
- assertEquals("usd_amt", new String(p1.getSecond().getQualifier()));
- assertEquals("35.43", new String(p1.getSecond().getValue()));
+ assertEquals("cf1", new String(copy(p1.getSecond())));
+ assertEquals("usd_amt", new String(copy(p1.getSecond())));
+ assertEquals("35.43", new String(copy(p1.getSecond())));
assertEquals(key, p2.getFirst());
- assertEquals("cf1", new String(p2.getSecond().getFamily()));
- assertEquals("item_count", new String(p2.getSecond().getQualifier()));
- assertEquals("2", new String(p2.getSecond().getValue()));
+ assertEquals("cf1", new String(copy(p2.getSecond())));
+ assertEquals("item_count", new String(copy(p2.getSecond())));
+ assertEquals("2", new String(copy(p2.getSecond())));
+ }
+
+ private byte[] copy(KeyValue kv) {
+ return copy(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength());
+ }
+
+ private byte[] copy(byte[] array, int offset, int length) {
+ byte[] result = new byte[length];
+ System.arraycopy(array, offset, result, 0, length);
+ return result;
}
}
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/TestHbaseClient.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/TestHbaseClient.java
index 2b8ecae..b77d2cb 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/TestHbaseClient.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/TestHbaseClient.java
@@ -22,8 +22,11 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.kylin.common.util.Bytes;
/**
@@ -89,13 +92,16 @@ public class TestHbaseClient {
conf.set("hbase.zookeeper.quorum", "hbase_host");
conf.set("zookeeper.znode.parent", "/hbase-unsecure");
- HTable table = new HTable(conf, "test1");
+ Connection connection = ConnectionFactory.createConnection(conf);
+
+ Table table = connection.getTable(TableName.valueOf("test1"));
Put put = new Put(Bytes.toBytes("row1"));
- put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
- put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val2"));
+ put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
+ put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val2"));
table.put(put);
table.close();
+ connection.close();
}
}
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
index 5426b62..3b95a50 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
@@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -174,6 +175,7 @@ public class CubeMigrationCLI extends AbstractApplication {
Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
hbaseAdmin = new HBaseAdmin(conf);
+
hdfsFS = HadoopUtil.getWorkingFileSystem();
operations = new ArrayList<Opt>();
copyFilesInMetaStore(cube);
@@ -418,10 +420,10 @@ public class CubeMigrationCLI extends AbstractApplication {
String tableName = (String) opt.params[0];
System.out.println("CHANGE_HTABLE_HOST, table name: " + tableName);
HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- hbaseAdmin.disableTable(tableName);
+ hbaseAdmin.disableTable(TableName.valueOf(tableName));
desc.setValue(IRealizationConstants.HTableTag, dstConfig.getMetadataUrlPrefix());
- hbaseAdmin.modifyTable(tableName, desc);
- hbaseAdmin.enableTable(tableName);
+ hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+ hbaseAdmin.enableTable(TableName.valueOf(tableName));
logger.info("CHANGE_HTABLE_HOST is completed");
break;
}
@@ -580,10 +582,10 @@ public class CubeMigrationCLI extends AbstractApplication {
case CHANGE_HTABLE_HOST: {
String tableName = (String) opt.params[0];
HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- hbaseAdmin.disableTable(tableName);
+ hbaseAdmin.disableTable(TableName.valueOf(tableName));
desc.setValue(IRealizationConstants.HTableTag, srcConfig.getMetadataUrlPrefix());
- hbaseAdmin.modifyTable(tableName, desc);
- hbaseAdmin.enableTable(tableName);
+ hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+ hbaseAdmin.enableTable(TableName.valueOf(tableName));
break;
}
case COPY_FILE_IN_META: {
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java
index 54fbbc0..52bad9d 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java
@@ -29,7 +29,9 @@ import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.cube.CubeInstance;
@@ -61,7 +63,8 @@ public class CubeMigrationCheckCLI {
private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false).withDescription("The name of cube migrated").create("cube");
private KylinConfig dstCfg;
- private HBaseAdmin hbaseAdmin;
+ private Admin hbaseAdmin;
+ private Connection connection;
private List<String> issueExistHTables;
private List<String> inconsistentHTables;
@@ -123,6 +126,7 @@ public class CubeMigrationCheckCLI {
}
fixInconsistent();
printIssueExistingHTables();
+ connection.close();
}
public CubeMigrationCheckCLI(KylinConfig kylinConfig, Boolean isFix) throws IOException {
@@ -130,7 +134,8 @@ public class CubeMigrationCheckCLI {
this.ifFix = isFix;
Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
- hbaseAdmin = new HBaseAdmin(conf);
+ connection = ConnectionFactory.createConnection(conf);
+ hbaseAdmin = connection.getAdmin();
issueExistHTables = Lists.newArrayList();
inconsistentHTables = Lists.newArrayList();
@@ -189,10 +194,10 @@ public class CubeMigrationCheckCLI {
String[] sepNameList = segFullName.split(",");
HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(sepNameList[0]));
logger.info("Change the host of htable " + sepNameList[0] + "belonging to cube " + sepNameList[1] + " from " + desc.getValue(IRealizationConstants.HTableTag) + " to " + dstCfg.getMetadataUrlPrefix());
- hbaseAdmin.disableTable(sepNameList[0]);
+ hbaseAdmin.disableTable(TableName.valueOf(sepNameList[0]));
desc.setValue(IRealizationConstants.HTableTag, dstCfg.getMetadataUrlPrefix());
- hbaseAdmin.modifyTable(sepNameList[0], desc);
- hbaseAdmin.enableTable(sepNameList[0]);
+ hbaseAdmin.modifyTable(TableName.valueOf(sepNameList[0]), desc);
+ hbaseAdmin.enableTable(TableName.valueOf(sepNameList[0]));
}
} else {
logger.info("------ Inconsistent HTables Needed To Be Fixed ------");
diff --git a/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
index 9c6cba6..b5a8440 100644
--- a/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
@@ -245,7 +245,7 @@ public class ExtendCubeToHybridCLI {
value = Bytes.toBytes(valueString);
}
Put put = new Put(Bytes.toBytes(newCubeId));
- put.add(family, column, value);
+ put.addColumn(family, column, value);
aclHtable.put(put);
}
}
diff --git a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
index 16aa5ff..f6099eb 100644
--- a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
+++ b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
@@ -22,6 +22,7 @@ package org.apache.kylin.tool;
* Created by xiefan on 17-4-20.
*/
public class StorageCleanupJob {
+
public static void main(String[] args) throws Exception {
org.apache.kylin.rest.job.StorageCleanupJob cli = new org.apache.kylin.rest.job.StorageCleanupJob();
cli.execute(args);