You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/27 11:21:40 UTC
[38/52] [abbrv] incubator-kylin git commit: KYLIN-875 Split job
module into 'core-job', 'engine-mr', 'source-hive',
'storage-hbase'. The old job remains as an assembly project.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
new file mode 100644
index 0000000..f62df42
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+/**
+ * @author George Song (ysong1)
+ *
+ */
+
+import static org.apache.hadoop.util.StringUtils.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.StringSplitter;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.job.cmd.ShellCmdOutput;
+import org.apache.kylin.job.common.OptionsHelper;
+import org.apache.kylin.job.exception.JobException;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("static-access")
+public abstract class AbstractHadoopJob extends Configured implements Tool {
+ protected static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class);
+
+ protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Job name. For exmaple, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create("jobname");
+ protected static final Option OPTION_JOB_FLOW_ID = OptionBuilder.withArgName("job flow ID").hasArg().isRequired(true).withDescription("job flow ID").create("jobflowid");
+ protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create("cubename");
+ protected static final Option OPTION_II_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("II name. For exmaple, some_ii").create("iiname");
+ protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube segment name)").create("segmentname");
+ protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Hive table name.").create("tablename");
+ protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input");
+ protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName("inputformat").hasArg().isRequired(false).withDescription("Input format").create("inputformat");
+ protected static final Option OPTION_INPUT_DELIM = OptionBuilder.withArgName("inputdelim").hasArg().isRequired(false).withDescription("Input delimeter").create("inputdelim");
+ protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output");
+ protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName("level").hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create("level");
+ protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("input");
+ protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName("htable name").hasArg().isRequired(true).withDescription("HTable name").create("htablename");
+ protected static final Option OPTION_KEY_COLUMN_PERCENTAGE = OptionBuilder.withArgName("rowkey column percentage").hasArg().isRequired(true).withDescription("Percentage of row key columns").create("columnpercentage");
+ protected static final Option OPTION_KEY_SPLIT_NUMBER = OptionBuilder.withArgName("key split number").hasArg().isRequired(true).withDescription("Number of key split range").create("splitnumber");
+
+ protected static final Option OPTION_STATISTICS_ENABLED = OptionBuilder.withArgName("statisticsenabled").hasArg().isRequired(false).withDescription("Statistics enabled").create("statisticsenabled");
+ protected static final Option OPTION_STATISTICS_OUTPUT = OptionBuilder.withArgName("statisticsoutput").hasArg().isRequired(false).withDescription("Statistics output").create("statisticsoutput");
+ protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = OptionBuilder.withArgName("statisticssamplingpercent").hasArg().isRequired(false).withDescription("Statistics sampling percentage").create("statisticssamplingpercent");
+
+ protected String name;
+ protected String description;
+ protected boolean isAsync = false;
+ protected OptionsHelper optionsHelper = new OptionsHelper();
+
+ protected Job job;
+
+ public AbstractHadoopJob() {
+ super(HadoopUtil.newHadoopJobConfiguration());
+ }
+
+ protected void parseOptions(Options options, String[] args) throws ParseException {
+ optionsHelper.parseOptions(options, args);
+ }
+
+ public void printUsage(Options options) {
+ optionsHelper.printUsage(getClass().getSimpleName(), options);
+ }
+
+ public Option[] getOptions() {
+ return optionsHelper.getOptions();
+ }
+
+ public String getOptionsAsString() {
+ return optionsHelper.getOptionsAsString();
+ }
+
+ protected String getOptionValue(Option option) {
+ return optionsHelper.getOptionValue(option);
+ }
+
+ protected boolean hasOption(Option option) {
+ return optionsHelper.hasOption(option);
+ }
+
+ protected int waitForCompletion(Job job) throws IOException, InterruptedException, ClassNotFoundException {
+ int retVal = 0;
+ long start = System.nanoTime();
+ if (isAsync) {
+ job.submit();
+ } else {
+ job.waitForCompletion(true);
+ retVal = job.isSuccessful() ? 0 : 1;
+ logger.debug("Job '" + job.getJobName() + "' finished " + (job.isSuccessful() ? "successfully in " : "with failures. Time taken ") + formatTime((System.nanoTime() - start) / 1000000L));
+ }
+ return retVal;
+ }
+
+ protected static void runJob(Tool job, String[] args) {
+ try {
+ int exitCode = ToolRunner.run(job, args);
+ System.exit(exitCode);
+ } catch (Exception e) {
+ e.printStackTrace(System.err);
+ System.exit(5);
+ }
+ }
+
+ private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
+
+ protected void setJobClasspath(Job job) {
+ String jarPath = KylinConfig.getInstanceFromEnv().getKylinJobJarPath();
+ File jarFile = new File(jarPath);
+ if (jarFile.exists()) {
+ job.setJar(jarPath);
+ logger.info("append job jar: " + jarPath);
+ } else {
+ job.setJarByClass(this.getClass());
+ }
+
+ String kylinHiveDependency = System.getProperty("kylin.hive.dependency");
+ String kylinHBaseDependency = System.getProperty("kylin.hbase.dependency");
+ logger.info("append kylin.hive.dependency: " + kylinHiveDependency + " and kylin.hive.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH);
+
+ Configuration jobConf = job.getConfiguration();
+ String classpath = jobConf.get(MAP_REDUCE_CLASSPATH);
+ if (classpath == null || classpath.length() == 0) {
+ logger.info("Didn't find " + MAP_REDUCE_CLASSPATH + " in job configuration, will run 'mapred classpath' to get the default value.");
+ classpath = getDefaultMapRedClasspath();
+ logger.info("The default mapred classpath is: " + classpath);
+ }
+
+
+ if (kylinHBaseDependency != null) {
+ // yarn classpath is comma separated
+ kylinHBaseDependency = kylinHBaseDependency.replace(":", ",");
+ classpath = classpath + "," + kylinHBaseDependency;
+ }
+
+ if (kylinHiveDependency != null) {
+ // yarn classpath is comma separated
+ kylinHiveDependency = kylinHiveDependency.replace(":", ",");
+ classpath = classpath + "," + kylinHiveDependency;
+ }
+
+ jobConf.set(MAP_REDUCE_CLASSPATH, classpath + "," + kylinHiveDependency);
+ logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH));
+ }
+
+
+ private String getDefaultMapRedClasspath() {
+
+ String classpath = "";
+ try {
+ CliCommandExecutor executor = KylinConfig.getInstanceFromEnv().getCliCommandExecutor();
+ ShellCmdOutput output = new ShellCmdOutput();
+ executor.execute("mapred classpath", output);
+
+ classpath = output.getOutput().trim().replace(':', ',');
+ } catch (IOException e) {
+ logger.error("Failed to run: 'mapred classpath'.", e);
+ }
+
+ return classpath;
+ }
+
+
+ public void addInputDirs(String input, Job job) throws IOException {
+ for (String inp : StringSplitter.split(input, ",")) {
+ inp = inp.trim();
+ if (inp.endsWith("/*")) {
+ inp = inp.substring(0, inp.length() - 2);
+ FileSystem fs = FileSystem.get(job.getConfiguration());
+ Path path = new Path(inp);
+ FileStatus[] fileStatuses = fs.listStatus(path);
+ boolean hasDir = false;
+ for (FileStatus stat : fileStatuses) {
+ if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
+ hasDir = true;
+ addInputDirs(stat.getPath().toString(), job);
+ }
+ }
+ if (fileStatuses.length > 0 && !hasDir) {
+ addInputDirs(path.toString(), job);
+ }
+ } else {
+ logger.debug("Add input " + inp);
+ FileInputFormat.addInputPath(job, new Path(inp));
+ }
+ }
+ }
+
+ public static KylinConfig loadKylinPropsAndMetadata() throws IOException {
+ File metaDir = new File("meta");
+ System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath());
+ logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath());
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ kylinConfig.setMetadataUrl(metaDir.getCanonicalPath());
+ return kylinConfig;
+ }
+
+ protected void attachKylinPropsAndMetadata(TableDesc table, Configuration conf) throws IOException {
+ ArrayList<String> dumpList = new ArrayList<String>();
+ dumpList.add(table.getResourcePath());
+ attachKylinPropsAndMetadata(dumpList, conf);
+ }
+
+ protected void attachKylinPropsAndMetadata(CubeInstance cube, Configuration conf) throws IOException {
+ MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+ // write cube / model_desc / cube_desc / dict / table
+ ArrayList<String> dumpList = new ArrayList<String>();
+ dumpList.add(cube.getResourcePath());
+ dumpList.add(cube.getDescriptor().getModel().getResourcePath());
+ dumpList.add(cube.getDescriptor().getResourcePath());
+
+ for (String tableName : cube.getDescriptor().getModel().getAllTables()) {
+ TableDesc table = metaMgr.getTableDesc(tableName);
+ dumpList.add(table.getResourcePath());
+ }
+ for (CubeSegment segment : cube.getSegments()) {
+ dumpList.addAll(segment.getDictionaryPaths());
+ }
+
+ attachKylinPropsAndMetadata(dumpList, conf);
+ }
+
+ protected void attachKylinPropsAndMetadata(ArrayList<String> dumpList, Configuration conf) throws IOException {
+ File tmp = File.createTempFile("kylin_job_meta", "");
+ tmp.delete(); // we need a directory, so delete the file first
+
+ File metaDir = new File(tmp, "meta");
+ metaDir.mkdirs();
+ metaDir.getParentFile().deleteOnExit();
+
+ // write kylin.properties
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ File kylinPropsFile = new File(metaDir, "kylin.properties");
+ kylinConfig.writeProperties(kylinPropsFile);
+
+ // write resources
+ dumpResources(kylinConfig, metaDir, dumpList);
+
+ // hadoop distributed cache
+ conf.set("tmpfiles", "file:///" + OptionsHelper.convertToFileURL(metaDir.getAbsolutePath()));
+ }
+
+ private void dumpResources(KylinConfig kylinConfig, File metaDir, ArrayList<String> dumpList) throws IOException {
+ ResourceStore from = ResourceStore.getStore(kylinConfig);
+ KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
+ ResourceStore to = ResourceStore.getStore(localConfig);
+ for (String path : dumpList) {
+ InputStream in = from.getResource(path);
+ if (in == null)
+ throw new IllegalStateException("No resource found at -- " + path);
+ long ts = from.getResourceTimestamp(path);
+ to.putResource(path, in, ts);
+ //The following log is duplicate with in ResourceStore
+ //log.info("Dumped resource " + path + " to " + metaDir.getAbsolutePath());
+ }
+ }
+
+ protected void deletePath(Configuration conf, Path path) throws IOException {
+ HadoopUtil.deletePath(conf, path);
+ }
+
+ protected double getTotalMapInputMB() throws ClassNotFoundException, IOException, InterruptedException, JobException {
+ if (job == null) {
+ throw new JobException("Job is null");
+ }
+
+ long mapInputBytes = 0;
+ InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
+ for (InputSplit split : input.getSplits(job)) {
+ mapInputBytes += split.getLength();
+ }
+ if (mapInputBytes == 0) {
+ throw new IllegalArgumentException("Map input splits are 0 bytes, something is wrong!");
+ }
+ double totalMapInputMB = (double) mapInputBytes / 1024 / 1024;
+ return totalMapInputMB;
+ }
+
+ protected int getMapInputSplitCount() throws ClassNotFoundException, JobException, IOException, InterruptedException {
+ if (job == null) {
+ throw new JobException("Job is null");
+ }
+ InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
+ return input.getSplits(job).size();
+ }
+
+ public void kill() throws JobException {
+ if (job != null) {
+ try {
+ job.killJob();
+ } catch (IOException e) {
+ throw new JobException(e);
+ }
+ }
+ }
+
+ public Map<String, String> getInfo() throws JobException {
+ if (job != null) {
+ Map<String, String> status = new HashMap<String, String>();
+ if (null != job.getJobID()) {
+ status.put(JobInstance.MR_JOB_ID, job.getJobID().toString());
+ }
+ if (null != job.getTrackingURL()) {
+ status.put(JobInstance.YARN_APP_URL, job.getTrackingURL().toString());
+ }
+
+ return status;
+ } else {
+ throw new JobException("Job is null");
+ }
+ }
+
+ public Counters getCounters() throws JobException {
+ if (job != null) {
+ try {
+ return job.getCounters();
+ } catch (IOException e) {
+ throw new JobException(e);
+ }
+ } else {
+ throw new JobException("Job is null");
+ }
+ }
+
+ public void setAsync(boolean isAsync) {
+ this.isAsync = isAsync;
+ }
+
+ public Job getJob() {
+ return this.job;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
new file mode 100644
index 0000000..3a40ce2
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+/**
+ * @author George Song (ysong1)
+ *
+ */
+public interface BatchConstants {
+
+ public static final char INTERMEDIATE_TABLE_ROW_DELIMITER = 127;
+
+ public static final String CFG_CUBE_NAME = "cube.name";
+ public static final String CFG_CUBE_SEGMENT_NAME = "cube.segment.name";
+
+ public static final String CFG_II_NAME = "ii.name";
+ public static final String CFG_II_SEGMENT_NAME = "ii.segment.name";
+
+ public static final String INPUT_DELIM = "input.delim";
+ public static final String OUTPUT_PATH = "output.path";
+
+ public static final String TABLE_NAME = "table.name";
+ public static final String TABLE_COLUMNS = "table.columns";
+
+ public static final String CFG_IS_MERGE = "is.merge";
+ public static final String CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER = "cube.intermediate.table.row.delimiter";
+
+ public static final String MAPREDUCE_COUTNER_GROUP_NAME = "Cube Builder";
+
+ public static final String MAPPER_SAMPLE_NUMBER = "mapper.sample.number";
+ public static final String REGION_NUMBER = "region.number";
+ public static final String CUBE_CAPACITY = "cube.capacity";
+
+ public static final String CFG_STATISTICS_ENABLED = "statistics.enabled";
+ public static final String CFG_STATISTICS_OUTPUT = "statistics.ouput";
+ public static final String CFG_STATISTICS_SAMPLING_PERCENT = "statistics.sampling.percent";
+ public static final String CFG_STATISTICS_CUBE_ESTIMATION = "cube_statistics.txt";
+ public static final String CFG_STATISTICS_CUBOID_ESTIMATION = "cuboid_statistics.seq";
+
+ public static final int COUNTER_MAX = 100000;
+ public static final int ERROR_RECORD_THRESHOLD = 100;
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java
new file mode 100644
index 0000000..c8e74f6
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+
+import org.apache.commons.httpclient.ConnectTimeoutException;
+import org.apache.commons.httpclient.HttpClientError;
+import org.apache.commons.httpclient.params.HttpConnectionParams;
+import org.apache.commons.httpclient.protocol.ControllerThreadSocketFactory;
+import org.apache.commons.httpclient.protocol.SecureProtocolSocketFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author xduo
+ *
+ */
+public class DefaultSslProtocolSocketFactory implements SecureProtocolSocketFactory {
+ /** Log object for this class. */
+ private static Logger LOG = LoggerFactory.getLogger(DefaultSslProtocolSocketFactory.class);
+ private SSLContext sslcontext = null;
+
+ /**
+ * Constructor for DefaultSslProtocolSocketFactory.
+ */
+ public DefaultSslProtocolSocketFactory() {
+ super();
+ }
+
+ /**
+ * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int,java.net.InetAddress,int)
+ */
+ public Socket createSocket(String host, int port, InetAddress clientHost, int clientPort) throws IOException, UnknownHostException {
+ return getSSLContext().getSocketFactory().createSocket(host, port, clientHost, clientPort);
+ }
+
+ /**
+ * Attempts to get a new socket connection to the given host within the
+ * given time limit.
+ *
+ * <p>
+ * To circumvent the limitations of older JREs that do not support connect
+ * timeout a controller thread is executed. The controller thread attempts
+ * to create a new socket within the given limit of time. If socket
+ * constructor does not return until the timeout expires, the controller
+ * terminates and throws an {@link ConnectTimeoutException}
+ * </p>
+ *
+ * @param host
+ * the host name/IP
+ * @param port
+ * the port on the host
+ * @param localAddress
+ * the local host name/IP to bind the socket to
+ * @param localPort
+ * the port on the local machine
+ * @param params
+ * {@link HttpConnectionParams Http connection parameters}
+ *
+ * @return Socket a new socket
+ *
+ * @throws IOException
+ * if an I/O error occurs while creating the socket
+ * @throws UnknownHostException
+ * if the IP address of the host cannot be determined
+ * @throws ConnectTimeoutException
+ * DOCUMENT ME!
+ * @throws IllegalArgumentException
+ * DOCUMENT ME!
+ */
+ public Socket createSocket(final String host, final int port, final InetAddress localAddress, final int localPort, final HttpConnectionParams params) throws IOException, UnknownHostException, ConnectTimeoutException {
+ if (params == null) {
+ throw new IllegalArgumentException("Parameters may not be null");
+ }
+
+ int timeout = params.getConnectionTimeout();
+
+ if (timeout == 0) {
+ return createSocket(host, port, localAddress, localPort);
+ } else {
+ // To be eventually deprecated when migrated to Java 1.4 or above
+ return ControllerThreadSocketFactory.createSocket(this, host, port, localAddress, localPort, timeout);
+ }
+ }
+
+ /**
+ * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int)
+ */
+ public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
+ return getSSLContext().getSocketFactory().createSocket(host, port);
+ }
+
+ /**
+ * @see SecureProtocolSocketFactory#createSocket(java.net.Socket,java.lang.String,int,boolean)
+ */
+ public Socket createSocket(Socket socket, String host, int port, boolean autoClose) throws IOException, UnknownHostException {
+ return getSSLContext().getSocketFactory().createSocket(socket, host, port, autoClose);
+ }
+
+ public boolean equals(Object obj) {
+ return ((obj != null) && obj.getClass().equals(DefaultX509TrustManager.class));
+ }
+
+ public int hashCode() {
+ return DefaultX509TrustManager.class.hashCode();
+ }
+
+ private static SSLContext createEasySSLContext() {
+ try {
+ SSLContext context = SSLContext.getInstance("TLS");
+ context.init(null, new TrustManager[] { new DefaultX509TrustManager(null) }, null);
+
+ return context;
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new HttpClientError(e.toString());
+ }
+ }
+
+ private SSLContext getSSLContext() {
+ if (this.sslcontext == null) {
+ this.sslcontext = createEasySSLContext();
+ }
+
+ return this.sslcontext;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultX509TrustManager.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultX509TrustManager.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultX509TrustManager.java
new file mode 100644
index 0000000..d7901e5
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultX509TrustManager.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author xduo
+ *
+ */
+public class DefaultX509TrustManager implements X509TrustManager {
+
+ /** Log object for this class. */
+ private static Logger LOG = LoggerFactory.getLogger(DefaultX509TrustManager.class);
+ private X509TrustManager standardTrustManager = null;
+
+ /**
+ * Constructor for DefaultX509TrustManager.
+ *
+ */
+ public DefaultX509TrustManager(KeyStore keystore) throws NoSuchAlgorithmException, KeyStoreException {
+ super();
+
+ TrustManagerFactory factory = TrustManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ factory.init(keystore);
+
+ TrustManager[] trustmanagers = factory.getTrustManagers();
+
+ if (trustmanagers.length == 0) {
+ throw new NoSuchAlgorithmException("SunX509 trust manager not supported");
+ }
+
+ this.standardTrustManager = (X509TrustManager) trustmanagers[0];
+ }
+
+ public X509Certificate[] getAcceptedIssuers() {
+ return this.standardTrustManager.getAcceptedIssuers();
+ }
+
+ public boolean isClientTrusted(X509Certificate[] certificates) {
+ return true;
+ // return this.standardTrustManager.isClientTrusted(certificates);
+ }
+
+ public boolean isServerTrusted(X509Certificate[] certificates) {
+ if ((certificates != null) && LOG.isDebugEnabled()) {
+ LOG.debug("Server certificate chain:");
+
+ for (int i = 0; i < certificates.length; i++) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("X509Certificate[" + i + "]=" + certificates[i]);
+ }
+ }
+ }
+
+ if ((certificates != null) && (certificates.length == 1)) {
+ X509Certificate certificate = certificates[0];
+
+ try {
+ certificate.checkValidity();
+ } catch (CertificateException e) {
+ LOG.error(e.toString());
+
+ return false;
+ }
+
+ return true;
+ } else {
+ return true;
+ // return this.standardTrustManager.isServerTrusted(certificates);
+ }
+ }
+
+ @Override
+ public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
new file mode 100644
index 0000000..294c957
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author xduo
+ *
+ */
+public class HadoopCmdOutput {
+
+ protected static final Logger log = LoggerFactory.getLogger(HadoopCmdOutput.class);
+
+ private final StringBuilder output;
+ private final Job job;
+
+ public HadoopCmdOutput(Job job, StringBuilder output) {
+ super();
+ this.job = job;
+ this.output = output;
+ }
+
+ public String getMrJobId() {
+ return getInfo().get(ExecutableConstants.MR_JOB_ID);
+ }
+
+ public Map<String, String> getInfo() {
+ if (job != null) {
+ Map<String, String> status = new HashMap<String, String>();
+ if (null != job.getJobID()) {
+ status.put(ExecutableConstants.MR_JOB_ID, job.getJobID().toString());
+ }
+ if (null != job.getTrackingURL()) {
+ status.put(ExecutableConstants.YARN_APP_URL, job.getTrackingURL().toString());
+ }
+ return status;
+ } else {
+ return Collections.emptyMap();
+ }
+ }
+
+ private String mapInputRecords;
+ private String hdfsBytesWritten;
+ private String hdfsBytesRead;
+
+ public String getMapInputRecords() {
+ return mapInputRecords;
+ }
+
+ public String getHdfsBytesWritten() {
+ return hdfsBytesWritten;
+ }
+
+ public String getHdfsBytesRead() {
+ return hdfsBytesRead;
+ }
+
+ public void updateJobCounter() {
+ try {
+ Counters counters = job.getCounters();
+ if (counters == null) {
+ String errorMsg = "no counters for job " + getMrJobId();
+ log.warn(errorMsg);
+ output.append(errorMsg);
+ return;
+ }
+ this.output.append(counters.toString()).append("\n");
+ log.debug(counters.toString());
+
+ mapInputRecords = String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue());
+ hdfsBytesWritten = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN").getValue());
+ hdfsBytesRead = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_READ").getValue());
+ } catch (Exception e) {
+ log.error(e.getLocalizedMessage(), e);
+ output.append(e.getLocalizedMessage());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
new file mode 100644
index 0000000..089df5f
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.Constructor;
+
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+import com.google.common.base.Preconditions;
+
+/**
+ */
+public class HadoopShellExecutable extends AbstractExecutable {
+
+ private static final String KEY_MR_JOB = "HADOOP_SHELL_JOB_CLASS";
+ private static final String KEY_PARAMS = "HADOOP_SHELL_JOB_PARAMS";
+
+ public HadoopShellExecutable() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ final String mapReduceJobClass = getJobClass();
+ String params = getJobParams();
+ Preconditions.checkNotNull(mapReduceJobClass);
+ Preconditions.checkNotNull(params);
+ try {
+ final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
+ final AbstractHadoopJob job = constructor.newInstance();
+ String[] args = params.trim().split("\\s+");
+ logger.info("parameters of the HadoopShellExecutable:");
+ logger.info(params);
+ int result;
+ StringBuilder log = new StringBuilder();
+ try {
+ result = ToolRunner.run(job, args);
+ } catch (Exception ex) {
+ logger.error("error execute " + this.toString(), ex);
+ StringWriter stringWriter = new StringWriter();
+ ex.printStackTrace(new PrintWriter(stringWriter));
+ log.append(stringWriter.toString()).append("\n");
+ result = 2;
+ }
+ log.append("result code:").append(result);
+ return result == 0 ? new ExecuteResult(ExecuteResult.State.SUCCEED, log.toString()):new ExecuteResult(ExecuteResult.State.FAILED, log.toString());
+ } catch (ReflectiveOperationException e) {
+ logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ } catch (Exception e) {
+ logger.error("error execute " + this.toString(), e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+ }
+
+ public void setJobClass(Class<? extends AbstractHadoopJob> clazzName) {
+ setParam(KEY_MR_JOB, clazzName.getName());
+ }
+
+ public String getJobClass() throws ExecuteException {
+ return getParam(KEY_MR_JOB);
+ }
+
+ public void setJobParams(String param) {
+ setParam(KEY_PARAMS, param);
+ }
+
+ public String getJobParams() {
+ return getParam(KEY_PARAMS);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java
new file mode 100644
index 0000000..1b71b92
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.kylin.job.constant.JobStepStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author xduo
+ *
+ */
+public class HadoopStatusChecker {
+
+ protected static final Logger logger = LoggerFactory.getLogger(HadoopStatusChecker.class);
+
+ private final String yarnUrl;
+ private final String mrJobID;
+ private final StringBuilder output;
+
+ public HadoopStatusChecker(String yarnUrl, String mrJobID, StringBuilder output) {
+ this.yarnUrl = yarnUrl;
+ this.mrJobID = mrJobID;
+ this.output = output;
+ }
+
+ public JobStepStatusEnum checkStatus() {
+ if (null == mrJobID) {
+ this.output.append("Skip status check with empty job id..\n");
+ return JobStepStatusEnum.WAITING;
+ }
+ JobStepStatusEnum status = null;
+ try {
+ final Pair<RMAppState, FinalApplicationStatus> result = new HadoopStatusGetter(yarnUrl, mrJobID).get();
+ logger.debug("State of Hadoop job: " + mrJobID + ":" + result.getLeft() + "-" + result.getRight());
+ output.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S").format(new Date()) + " - State of Hadoop job: " + mrJobID + ":" + result.getLeft() + " - " + result.getRight() + "\n");
+
+ switch (result.getRight()) {
+ case SUCCEEDED:
+ status = JobStepStatusEnum.FINISHED;
+ break;
+ case FAILED:
+ status = JobStepStatusEnum.ERROR;
+ break;
+ case KILLED:
+ status = JobStepStatusEnum.KILLED;
+ break;
+ case UNDEFINED:
+ switch (result.getLeft()) {
+ case NEW:
+ case NEW_SAVING:
+ case SUBMITTED:
+ case ACCEPTED:
+ status = JobStepStatusEnum.WAITING;
+ break;
+ case RUNNING:
+ status = JobStepStatusEnum.RUNNING;
+ break;
+ case FINAL_SAVING:
+ case FINISHING:
+ case FINISHED:
+ case FAILED:
+ case KILLING:
+ case KILLED:
+ }
+ break;
+ }
+ } catch (Exception e) {
+ logger.error("error check status", e);
+ output.append("Exception: " + e.getLocalizedMessage() + "\n");
+ status = JobStepStatusEnum.ERROR;
+ }
+
+ return status;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
new file mode 100644
index 0000000..53b8850
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+
+import org.apache.commons.httpclient.Header;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpMethod;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.protocol.Protocol;
+import org.apache.commons.httpclient.protocol.ProtocolSocketFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class HadoopStatusGetter {
+
+ private final String mrJobId;
+ private final String yarnUrl;
+
+ protected static final Logger log = LoggerFactory.getLogger(HadoopStatusChecker.class);
+
+ public HadoopStatusGetter(String yarnUrl, String mrJobId) {
+ this.yarnUrl = yarnUrl;
+ this.mrJobId = mrJobId;
+ }
+
+ public Pair<RMAppState, FinalApplicationStatus> get() throws IOException {
+ String applicationId = mrJobId.replace("job", "application");
+ String url = yarnUrl.replace("${job_id}", applicationId);
+ JsonNode root = new ObjectMapper().readTree(getHttpResponse(url));
+ RMAppState state = RMAppState.valueOf(root.findValue("state").getTextValue());
+ FinalApplicationStatus finalStatus = FinalApplicationStatus.valueOf(root.findValue("finalStatus").getTextValue());
+ return Pair.of(state, finalStatus);
+ }
+
+ private String getHttpResponse(String url) throws IOException {
+ HttpClient client = new HttpClient();
+
+ String response = null;
+ while (response == null) { // follow redirects via 'refresh'
+ if (url.startsWith("https://")) {
+ registerEasyHttps();
+ }
+ if (url.contains("anonymous=true") == false) {
+ url += url.contains("?") ? "&" : "?";
+ url += "anonymous=true";
+ }
+
+ HttpMethod get = new GetMethod(url);
+ get.addRequestHeader("accept", "application/json");
+
+ try {
+ client.executeMethod(get);
+
+ String redirect = null;
+ Header h = get.getResponseHeader("Refresh");
+ if (h != null) {
+ String s = h.getValue();
+ int cut = s.indexOf("url=");
+ if (cut >= 0) {
+ redirect = s.substring(cut + 4);
+ }
+ }
+
+ if (redirect == null) {
+ response = get.getResponseBodyAsString();
+ log.debug("Job " + mrJobId + " get status check result.\n");
+ } else {
+ url = redirect;
+ log.debug("Job " + mrJobId + " check redirect url " + url + ".\n");
+ }
+ } finally {
+ get.releaseConnection();
+ }
+ }
+
+ return response;
+ }
+
+ private static Protocol EASY_HTTPS = null;
+
+ private static void registerEasyHttps() {
+ // by pass all https issue
+ if (EASY_HTTPS == null) {
+ EASY_HTTPS = new Protocol("https", (ProtocolSocketFactory) new DefaultSslProtocolSocketFactory(), 443);
+ Protocol.registerProtocol("https", EASY_HTTPS);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
new file mode 100644
index 0000000..b62fd21
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.Constructor;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.constant.JobStepStatusEnum;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.execution.Output;
+
+import com.google.common.base.Preconditions;
+
+/**
+ */
+public class MapReduceExecutable extends AbstractExecutable {
+
+ private static final String KEY_MR_JOB = "MR_JOB_CLASS";
+ private static final String KEY_PARAMS = "MR_JOB_PARAMS";
+ private static final String KEY_COUNTER_SAVEAS = "MR_COUNTER_SAVEAS";
+
+ public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
+
+ public MapReduceExecutable() {
+ super();
+ }
+
+ @Override
+ protected void onExecuteStart(ExecutableContext executableContext) {
+ final Output output = executableManager.getOutput(getId());
+ if (output.getExtra().containsKey(START_TIME)) {
+ final String mrJobId = output.getExtra().get(ExecutableConstants.MR_JOB_ID);
+ if (mrJobId == null) {
+ executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+ return;
+ }
+ try {
+ Job job = new Cluster(new Configuration()).getJob(JobID.forName(mrJobId));
+ if (job.getJobState() == JobStatus.State.FAILED) {
+ //remove previous mr job info
+ super.onExecuteStart(executableContext);
+ } else {
+ executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+ }
+ } catch (IOException e) {
+ logger.warn("error get hadoop status");
+ super.onExecuteStart(executableContext);
+ } catch (InterruptedException e) {
+ logger.warn("error get hadoop status");
+ super.onExecuteStart(executableContext);
+ }
+ } else {
+ super.onExecuteStart(executableContext);
+ }
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ final String mapReduceJobClass = getMapReduceJobClass();
+ String params = getMapReduceParams();
+ Preconditions.checkNotNull(mapReduceJobClass);
+ Preconditions.checkNotNull(params);
+ try {
+ Job job;
+ final Map<String, String> extra = executableManager.getOutput(getId()).getExtra();
+ if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) {
+ job = new Cluster(new Configuration()).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID)));
+ logger.info("mr_job_id:" + extra.get(ExecutableConstants.MR_JOB_ID + " resumed"));
+ } else {
+ final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
+ final AbstractHadoopJob hadoopJob = constructor.newInstance();
+ hadoopJob.setAsync(true); // so the ToolRunner.run() returns right away
+ logger.info("parameters of the MapReduceExecutable:");
+ logger.info(params);
+ String[] args = params.trim().split("\\s+");
+ try {
+ //for async mr job, ToolRunner just return 0;
+ ToolRunner.run(hadoopJob, args);
+ } catch (Exception ex) {
+ StringBuilder log = new StringBuilder();
+ logger.error("error execute " + this.toString(), ex);
+ StringWriter stringWriter = new StringWriter();
+ ex.printStackTrace(new PrintWriter(stringWriter));
+ log.append(stringWriter.toString()).append("\n");
+ log.append("result code:").append(2);
+ return new ExecuteResult(ExecuteResult.State.ERROR, log.toString());
+ }
+ job = hadoopJob.getJob();
+ }
+ final StringBuilder output = new StringBuilder();
+ final HadoopCmdOutput hadoopCmdOutput = new HadoopCmdOutput(job, output);
+
+ final String restStatusCheckUrl = getRestStatusCheckUrl(job, context.getConfig());
+ if (restStatusCheckUrl == null) {
+ logger.error("restStatusCheckUrl is null");
+ return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null");
+ }
+ String mrJobId = hadoopCmdOutput.getMrJobId();
+ HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output);
+ JobStepStatusEnum status = JobStepStatusEnum.NEW;
+ while (!isDiscarded()) {
+ JobStepStatusEnum newStatus = statusChecker.checkStatus();
+ if (status == JobStepStatusEnum.KILLED) {
+ executableManager.updateJobOutput(getId(), ExecutableState.ERROR, Collections.<String, String>emptyMap(), "killed by admin");
+ return new ExecuteResult(ExecuteResult.State.FAILED, "killed by admin");
+ }
+ if (status == JobStepStatusEnum.WAITING && (newStatus == JobStepStatusEnum.FINISHED || newStatus == JobStepStatusEnum.ERROR || newStatus == JobStepStatusEnum.RUNNING)) {
+ final long waitTime = System.currentTimeMillis() - getStartTime();
+ setMapReduceWaitTime(waitTime);
+ }
+ status = newStatus;
+ executableManager.addJobInfo(getId(), hadoopCmdOutput.getInfo());
+ if (status.isComplete()) {
+ final Map<String, String> info = hadoopCmdOutput.getInfo();
+ readCounters(hadoopCmdOutput, info);
+ executableManager.addJobInfo(getId(), info);
+
+ if (status == JobStepStatusEnum.FINISHED) {
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+ } else {
+ return new ExecuteResult(ExecuteResult.State.FAILED, output.toString());
+ }
+ }
+ Thread.sleep(context.getConfig().getYarnStatusCheckIntervalSeconds() * 1000);
+ }
+ //TODO kill discarded mr job using "hadoop job -kill " + mrJobId
+
+ return new ExecuteResult(ExecuteResult.State.DISCARDED, output.toString());
+
+ } catch (ReflectiveOperationException e) {
+ logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ } catch (Exception e) {
+ logger.error("error execute " + this.toString(), e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+ }
+
+ private void readCounters(final HadoopCmdOutput hadoopCmdOutput, final Map<String, String> info) {
+ hadoopCmdOutput.updateJobCounter();
+ info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, hadoopCmdOutput.getMapInputRecords());
+ info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getHdfsBytesRead());
+ info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hadoopCmdOutput.getHdfsBytesWritten());
+
+ String saveAs = getParam(KEY_COUNTER_SAVEAS);
+ if (saveAs != null) {
+ String[] saveAsNames = saveAs.split(",");
+ saveCounterAs(hadoopCmdOutput.getMapInputRecords(), saveAsNames, 0, info);
+ saveCounterAs(hadoopCmdOutput.getHdfsBytesRead(), saveAsNames, 1, info);
+ saveCounterAs(hadoopCmdOutput.getHdfsBytesWritten(), saveAsNames, 2, info);
+ }
+ }
+
+ private void saveCounterAs(String counter, String[] saveAsNames, int i, Map<String, String> info) {
+ if (saveAsNames.length > i && StringUtils.isBlank(saveAsNames[i]) == false) {
+ info.put(saveAsNames[i].trim(), counter);
+ }
+ }
+
+ private String getRestStatusCheckUrl(Job job, KylinConfig config) {
+ final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl();
+ if (yarnStatusCheckUrl != null) {
+ return yarnStatusCheckUrl;
+ } else {
+ logger.info(KylinConfig.KYLIN_JOB_YARN_APP_REST_CHECK_URL + " is not set, read from job configuration");
+ }
+ String rmWebHost = job.getConfiguration().get("yarn.resourcemanager.webapp.address");
+ if (StringUtils.isEmpty(rmWebHost)) {
+ return null;
+ }
+ if (rmWebHost.startsWith("http://") || rmWebHost.startsWith("https://")) {
+ //do nothing
+ } else {
+ rmWebHost = "http://" + rmWebHost;
+ }
+ logger.info("yarn.resourcemanager.webapp.address:" + rmWebHost);
+ return rmWebHost + "/ws/v1/cluster/apps/${job_id}?anonymous=true";
+ }
+
+ public long getMapReduceWaitTime() {
+ return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L);
+ }
+
+ public void setMapReduceWaitTime(long t) {
+ addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
+ }
+
+ public void setMapReduceJobClass(Class<? extends AbstractHadoopJob> clazzName) {
+ setParam(KEY_MR_JOB, clazzName.getName());
+ }
+
+ public String getMapReduceJobClass() throws ExecuteException {
+ return getParam(KEY_MR_JOB);
+ }
+
+ public void setMapReduceParams(String param) {
+ setParam(KEY_PARAMS, param);
+ }
+
+ public String getMapReduceParams() {
+ return getParam(KEY_PARAMS);
+ }
+
+ public String getCounterSaveAs() {
+ return getParam(KEY_COUNTER_SAVEAS);
+ }
+
+ public void setCounterSaveAs(String value) {
+ setParam(KEY_COUNTER_SAVEAS, value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidJob.java
new file mode 100644
index 0000000..2e716eb
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidJob.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * @author honma
+ *
+ */
+
+public class BaseCuboidJob extends CuboidJob {
+ public BaseCuboidJob() {
+ this.setMapperClass(HiveToBaseCuboidMapper.class);
+ }
+
+ public static void main(String[] args) throws Exception {
+ CuboidJob job = new BaseCuboidJob();
+ int exitCode = ToolRunner.run(job, args);
+ System.exit(exitCode);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
new file mode 100644
index 0000000..9bc84ff
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -0,0 +1,205 @@
+package org.apache.kylin.engine.mr.steps;
+
+import com.google.common.collect.Lists;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesSplitter;
+import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ */
+public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> {
+ protected static final Logger logger = LoggerFactory.getLogger(HiveToBaseCuboidMapper.class);
+ public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
+ public static final byte[] ONE = Bytes.toBytes("1");
+ protected String cubeName;
+ protected String segmentName;
+ protected Cuboid baseCuboid;
+ protected CubeInstance cube;
+ protected CubeDesc cubeDesc;
+ protected CubeSegment cubeSegment;
+ protected List<byte[]> nullBytes;
+ protected CubeJoinedFlatTableDesc intermediateTableDesc;
+ protected String intermediateTableRowDelimiter;
+ protected byte byteRowDelimiter;
+ protected int counter;
+ protected Object[] measures;
+ protected byte[][] keyBytesBuf;
+ protected BytesSplitter bytesSplitter;
+ protected AbstractRowKeyEncoder rowKeyEncoder;
+ protected MeasureCodec measureCodec;
+ private int errorRecordCounter;
+ private Text outputKey = new Text();
+ private Text outputValue = new Text();
+ private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+
+ cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+ segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
+ intermediateTableRowDelimiter = context.getConfiguration().get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER));
+ if (Bytes.toBytes(intermediateTableRowDelimiter).length > 1) {
+ throw new RuntimeException("Expected delimiter byte length is 1, but got " + Bytes.toBytes(intermediateTableRowDelimiter).length);
+ }
+
+ byteRowDelimiter = Bytes.toBytes(intermediateTableRowDelimiter)[0];
+
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+ cube = CubeManager.getInstance(config).getCube(cubeName);
+ cubeDesc = cube.getDescriptor();
+ cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+
+ long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
+
+ intermediateTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), cubeSegment);
+
+ bytesSplitter = new BytesSplitter(200, 4096);
+ rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
+
+ measureCodec = new MeasureCodec(cubeDesc.getMeasures());
+ measures = new Object[cubeDesc.getMeasures().size()];
+
+ int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
+ keyBytesBuf = new byte[colCount][];
+
+ initNullBytes();
+ }
+
+ private void initNullBytes() {
+ nullBytes = Lists.newArrayList();
+ nullBytes.add(HIVE_NULL);
+ String[] nullStrings = cubeDesc.getNullStrings();
+ if (nullStrings != null) {
+ for (String s : nullStrings) {
+ nullBytes.add(Bytes.toBytes(s));
+ }
+ }
+ }
+
+ private boolean isNull(byte[] v) {
+ for (byte[] nullByte : nullBytes) {
+ if (Bytes.equals(v, nullByte))
+ return true;
+ }
+ return false;
+ }
+
+ private byte[] buildKey(SplittedBytes[] splitBuffers) {
+ int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
+ for (int i = 0; i < baseCuboid.getColumns().size(); i++) {
+ int index = rowKeyColumnIndexes[i];
+ keyBytesBuf[i] = Arrays.copyOf(splitBuffers[index].value, splitBuffers[index].length);
+ if (isNull(keyBytesBuf[i])) {
+ keyBytesBuf[i] = null;
+ }
+ }
+ return rowKeyEncoder.encode(keyBytesBuf);
+ }
+
+ private void buildValue(SplittedBytes[] splitBuffers) {
+
+ for (int i = 0; i < measures.length; i++) {
+ byte[] valueBytes = getValueBytes(splitBuffers, i);
+ measures[i] = measureCodec.getSerializer(i).valueOf(valueBytes);
+ }
+
+ valueBuf.clear();
+ measureCodec.encode(measures, valueBuf);
+ }
+
+ private byte[] getValueBytes(SplittedBytes[] splitBuffers, int measureIdx) {
+ MeasureDesc desc = cubeDesc.getMeasures().get(measureIdx);
+ FunctionDesc func = desc.getFunction();
+ ParameterDesc paramDesc = func.getParameter();
+ int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
+
+ byte[] result = null;
+
+ // constant
+ if (flatTableIdx == null) {
+ result = Bytes.toBytes(paramDesc.getValue());
+ }
+ // column values
+ else {
+ // for multiple columns, their values are joined
+ for (int i = 0; i < flatTableIdx.length; i++) {
+ SplittedBytes split = splitBuffers[flatTableIdx[i]];
+ if (result == null) {
+ result = Arrays.copyOf(split.value, split.length);
+ } else {
+ byte[] newResult = new byte[result.length + split.length];
+ System.arraycopy(result, 0, newResult, 0, result.length);
+ System.arraycopy(split.value, 0, newResult, result.length, split.length);
+ result = newResult;
+ }
+ }
+ }
+
+ if (func.isCount() || func.isHolisticCountDistinct()) {
+ // note for holistic count distinct, this value will be ignored
+ result = ONE;
+ }
+
+ if (isNull(result)) {
+ result = null;
+ }
+
+ return result;
+ }
+
+ protected void outputKV(Context context) throws IOException, InterruptedException {
+ intermediateTableDesc.sanityCheck(bytesSplitter);
+
+ byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers());
+ outputKey.set(rowKey, 0, rowKey.length);
+
+ buildValue(bytesSplitter.getSplitBuffers());
+ outputValue.set(valueBuf.array(), 0, valueBuf.position());
+ context.write(outputKey, outputValue);
+ }
+
+ protected void handleErrorRecord(BytesSplitter bytesSplitter, Exception ex) throws IOException {
+
+ System.err.println("Insane record: " + bytesSplitter);
+ ex.printStackTrace(System.err);
+
+ errorRecordCounter++;
+ if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) {
+ if (ex instanceof IOException)
+ throw (IOException) ex;
+ else if (ex instanceof RuntimeException)
+ throw (RuntimeException) ex;
+ else
+ throw new RuntimeException("", ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
new file mode 100644
index 0000000..4b4c815
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.cli.DictionaryGeneratorCLI;
+import org.apache.kylin.dict.DistinctColumnValuesProvider;
+import org.apache.kylin.engine.mr.DFSFileTable;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.ReadableTable;
+
+/**
+ * @author ysong1
+ *
+ */
+
+public class CreateDictionaryJob extends AbstractHadoopJob {
+
+ private int returnCode = 0;
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_NAME);
+ options.addOption(OPTION_INPUT_PATH);
+ parseOptions(options, args);
+
+ final String cubeName = getOptionValue(OPTION_CUBE_NAME);
+ final String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+ final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
+
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+
+ DictionaryGeneratorCLI.processSegment(config, cubeName, segmentName, new DistinctColumnValuesProvider() {
+ @Override
+ public ReadableTable getDistinctValuesFor(TblColRef col) {
+ return new DFSFileTable(factColumnsInputPath + "/" + col.getName(), -1);
+ }
+ });
+ } catch (Exception e) {
+ printUsage(options);
+ throw e;
+ }
+
+ return returnCode;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new CreateDictionaryJob(), args);
+ System.exit(exitCode);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
new file mode 100644
index 0000000..88edfe1
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidCLI;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.job.exception.JobException;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * @author ysong1
+ */
+public class CuboidJob extends AbstractHadoopJob {
+
+ protected static final Logger logger = LoggerFactory.getLogger(CuboidJob.class);
+ private static final String MAPRED_REDUCE_TASKS = "mapred.reduce.tasks";
+
+ @SuppressWarnings("rawtypes")
+ private Class<? extends Mapper> mapperClass;
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (this.mapperClass == null)
+ throw new Exception("Mapper class is not set!");
+
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_NAME);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_NCUBOID_LEVEL);
+ options.addOption(OPTION_INPUT_FORMAT);
+ parseOptions(options, args);
+
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+ String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+ int nCuboidLevel = Integer.parseInt(getOptionValue(OPTION_NCUBOID_LEVEL));
+ String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ CubeManager cubeMgr = CubeManager.getInstance(config);
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ logger.info("Starting: " + job.getJobName());
+
+ setJobClasspath(job);
+
+ // Mapper
+ configureMapperInputFormat(cube.getSegment(segmentName, SegmentStatusEnum.NEW));
+ job.setMapperClass(this.mapperClass);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setCombinerClass(CuboidReducer.class); // for base cuboid shuffle skew, some rowkey aggregates far more records than others
+
+ // Reducer
+ job.setReducerClass(CuboidReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ FileOutputFormat.setOutputPath(job, output);
+
+ // set job configuration
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+ // add metadata to distributed cache
+ attachKylinPropsAndMetadata(cube, job.getConfiguration());
+
+ setReduceTaskNum(job, config, cubeName, nCuboidLevel);
+
+ this.deletePath(job.getConfiguration(), output);
+
+ return waitForCompletion(job);
+ } catch (Exception e) {
+ logger.error("error in CuboidJob", e);
+ printUsage(options);
+ throw e;
+ }
+ }
+
+ private void configureMapperInputFormat(CubeSegment cubeSeg) throws IOException {
+ String input = getOptionValue(OPTION_INPUT_PATH);
+
+ if (StringUtils.isBlank(input)) {
+ // base cuboid case
+ IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+ flatTableInputFormat.configureJob(job);
+ }
+ else {
+ // n-dimension cuboid case
+ FileInputFormat.setInputPaths(job, new Path(input));
+ if (hasOption(OPTION_INPUT_FORMAT) && ("textinputformat".equalsIgnoreCase(getOptionValue(OPTION_INPUT_FORMAT)))) {
+ job.setInputFormatClass(TextInputFormat.class);
+ } else {
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ }
+ }
+ }
+
+ protected void setReduceTaskNum(Job job, KylinConfig config, String cubeName, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException {
+ Configuration jobConf = job.getConfiguration();
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+
+ CubeDesc cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor();
+
+ double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
+ double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
+
+ // total map input MB
+ double totalMapInputMB = this.getTotalMapInputMB();
+
+ // output / input ratio
+ int preLevelCuboids, thisLevelCuboids;
+ if (level == 0) { // base cuboid
+ preLevelCuboids = thisLevelCuboids = 1;
+ } else { // n-cuboid
+ int[] allLevelCount = CuboidCLI.calculateAllLevelCount(cubeDesc);
+ preLevelCuboids = allLevelCount[level - 1];
+ thisLevelCuboids = allLevelCount[level];
+ }
+
+ // total reduce input MB
+ double totalReduceInputMB = totalMapInputMB * thisLevelCuboids / preLevelCuboids;
+
+ // number of reduce tasks
+ int numReduceTasks = (int) Math.round(totalReduceInputMB / perReduceInputMB * reduceCountRatio);
+
+ // adjust reducer number for cube which has DISTINCT_COUNT measures for
+ // better performance
+ if (cubeDesc.hasHolisticCountDistinctMeasures()) {
+ numReduceTasks = numReduceTasks * 4;
+ }
+
+ // at least 1 reducer
+ numReduceTasks = Math.max(1, numReduceTasks);
+ // no more than 5000 reducer by default
+ numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
+
+ jobConf.setInt(MAPRED_REDUCE_TASKS, numReduceTasks);
+
+ logger.info("Having total map input MB " + Math.round(totalMapInputMB));
+ logger.info("Having level " + level + ", pre-level cuboids " + preLevelCuboids + ", this level cuboids " + thisLevelCuboids);
+ logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio);
+ logger.info("Setting " + MAPRED_REDUCE_TASKS + "=" + numReduceTasks);
+ }
+
+ /**
+ * @param mapperClass
+ * the mapperClass to set
+ */
+ @SuppressWarnings("rawtypes")
+ public void setMapperClass(Class<? extends Mapper> mapperClass) {
+ this.mapperClass = mapperClass;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
new file mode 100644
index 0000000..eab967e
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.metadata.measure.MeasureAggregators;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+/**
+ * @author George Song (ysong1)
+ *
+ */
+public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
+
+ private static final Logger logger = LoggerFactory.getLogger(CuboidReducer.class);
+
+ private String cubeName;
+ private CubeDesc cubeDesc;
+ private List<MeasureDesc> measuresDescs;
+
+ private MeasureCodec codec;
+ private MeasureAggregators aggs;
+
+ private int counter;
+ private Object[] input;
+ private Object[] result;
+
+ private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+ private Text outputValue = new Text();
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+ cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+ cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor();
+ measuresDescs = cubeDesc.getMeasures();
+
+ codec = new MeasureCodec(measuresDescs);
+ aggs = new MeasureAggregators(measuresDescs);
+
+ input = new Object[measuresDescs.size()];
+ result = new Object[measuresDescs.size()];
+ }
+
+ @Override
+ public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+
+ aggs.reset();
+
+ for (Text value : values) {
+ codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input);
+ aggs.aggregate(input);
+ }
+ aggs.collectStates(result);
+
+ valueBuf.clear();
+ codec.encode(result, valueBuf);
+
+ outputValue.set(valueBuf.array(), 0, valueBuf.position());
+ context.write(key, outputValue);
+
+ counter++;
+ if (counter % BatchConstants.COUNTER_MAX == 0) {
+ logger.info("Handled " + counter + " records!");
+ }
+ }
+
+}