You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/02/11 13:49:49 UTC
[10/51] [partial] kylin git commit: KYLIN-1416 keep only website in
document branch
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
deleted file mode 100644
index 698a978..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop;
-
-/**
- * @author George Song (ysong1)
- *
- */
-
-import static org.apache.hadoop.util.StringUtils.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.Tool;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.RawResource;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.common.util.StringSplitter;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.cmd.ShellCmdOutput;
-import org.apache.kylin.job.exception.JobException;
-import org.apache.kylin.job.tools.OptionsHelper;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@SuppressWarnings("static-access")
-public abstract class AbstractHadoopJob extends Configured implements Tool {
- protected static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class);
-
- protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Job name. For exmaple, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create("jobname");
- protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create("cubename");
- protected static final Option OPTION_II_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("II name. For exmaple, some_ii").create("iiname");
- protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube segment name)").create("segmentname");
- protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Hive table name.").create("tablename");
- protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input");
- protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName("inputformat").hasArg().isRequired(false).withDescription("Input format").create("inputformat");
- protected static final Option OPTION_INPUT_DELIM = OptionBuilder.withArgName("inputdelim").hasArg().isRequired(false).withDescription("Input delimeter").create("inputdelim");
- protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output");
- protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName("level").hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create("level");
- protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("input");
- protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName("htable name").hasArg().isRequired(true).withDescription("HTable name").create("htablename");
-
- protected String name;
- protected boolean isAsync = false;
- protected OptionsHelper optionsHelper = new OptionsHelper();
-
- protected Job job;
-
- protected void parseOptions(Options options, String[] args) throws ParseException {
- optionsHelper.parseOptions(options, args);
- }
-
- public void printUsage(Options options) {
- optionsHelper.printUsage(getClass().getSimpleName(), options);
- }
-
- public Option[] getOptions() {
- return optionsHelper.getOptions();
- }
-
- public String getOptionsAsString() {
- return optionsHelper.getOptionsAsString();
- }
-
- protected String getOptionValue(Option option) {
- return optionsHelper.getOptionValue(option);
- }
-
- protected boolean hasOption(Option option) {
- return optionsHelper.hasOption(option);
- }
-
- protected int waitForCompletion(Job job) throws IOException, InterruptedException, ClassNotFoundException {
- int retVal = 0;
- long start = System.nanoTime();
- if (isAsync) {
- job.submit();
- } else {
- job.waitForCompletion(true);
- retVal = job.isSuccessful() ? 0 : 1;
- logger.debug("Job '" + job.getJobName() + "' finished " + (job.isSuccessful() ? "successfully in " : "with failures. Time taken ") + formatTime((System.nanoTime() - start) / 1000000L));
- }
- return retVal;
- }
-
- private static final String KYLIN_HIVE_DEPENDENCY_JARS = "[^,]*hive-exec.jar|[^,]*hive-metastore.jar|[^,]*hive-hcatalog-core[0-9.-]*jar";
-
- String filterKylinHiveDependency(String kylinHiveDependency) {
- if (StringUtils.isBlank(kylinHiveDependency))
- return "";
-
- StringBuilder jarList = new StringBuilder();
-
- Pattern hivePattern = Pattern.compile(KYLIN_HIVE_DEPENDENCY_JARS);
- Matcher matcher = hivePattern.matcher(kylinHiveDependency);
-
- while (matcher.find()) {
- if (jarList.length() > 0)
- jarList.append(",");
- jarList.append(matcher.group());
- }
-
- return jarList.toString();
- }
-
- private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
-
- protected void setJobClasspath(Job job) {
- String jarPath = KylinConfig.getInstanceFromEnv().getKylinJobJarPath();
- File jarFile = new File(jarPath);
- if (jarFile.exists()) {
- job.setJar(jarPath);
- logger.info("append job jar: " + jarPath);
- } else {
- job.setJarByClass(this.getClass());
- }
-
- String kylinHiveDependency = System.getProperty("kylin.hive.dependency");
- String kylinHBaseDependency = System.getProperty("kylin.hbase.dependency");
- logger.info("append kylin.hbase.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH);
-
- Configuration jobConf = job.getConfiguration();
- String classpath = jobConf.get(MAP_REDUCE_CLASSPATH);
- if (classpath == null || classpath.length() == 0) {
- logger.info("Didn't find " + MAP_REDUCE_CLASSPATH + " in job configuration, will run 'mapred classpath' to get the default value.");
- classpath = getDefaultMapRedClasspath();
- classpath = classpath.replace(":", ","); // yarn classpath is comma separated
- logger.info("The default mapred classpath is: " + classpath);
- }
-
- if (kylinHBaseDependency != null) {
- // yarn classpath is comma separated
- kylinHBaseDependency = kylinHBaseDependency.replace(":", ",");
- classpath = classpath + "," + kylinHBaseDependency;
- }
-
- jobConf.set(MAP_REDUCE_CLASSPATH, classpath);
- logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH));
-
- /*
- * set extra dependencies as tmpjars & tmpfiles if configured
- */
- StringBuilder kylinDependency = new StringBuilder();
-
- // for hive dependencies
- if (kylinHiveDependency != null) {
- // yarn classpath is comma separated
- kylinHiveDependency = kylinHiveDependency.replace(":", ",");
-
- logger.info("Hive Dependencies Before Filtered: " + kylinHiveDependency);
- String filteredHive = filterKylinHiveDependency(kylinHiveDependency);
- logger.info("Hive Dependencies After Filtered: " + filteredHive);
-
- if (kylinDependency.length() > 0)
- kylinDependency.append(",");
- kylinDependency.append(filteredHive);
- }
-
- // for KylinJobMRLibDir
- String mrLibDir = KylinConfig.getInstanceFromEnv().getKylinJobMRLibDir();
- if (!StringUtils.isBlank(mrLibDir)) {
- File dirFileMRLIB = new File(mrLibDir);
- if (dirFileMRLIB.exists()) {
- if (kylinDependency.length() > 0)
- kylinDependency.append(",");
- kylinDependency.append(mrLibDir);
- } else {
- logger.info("The directory '" + mrLibDir + "' for 'kylin.job.mr.lib.dir' does not exist!!!");
- }
- }
-
- setJobTmpJarsAndFiles(job, kylinDependency.toString());
- }
-
- private void setJobTmpJarsAndFiles(Job job, String kylinDependency) {
- if (StringUtils.isBlank(kylinDependency))
- return;
-
- String[] fNameList = kylinDependency.split(",");
-
- try {
- Configuration jobConf = job.getConfiguration();
- FileSystem fs = FileSystem.getLocal(jobConf);
-
- StringBuilder jarList = new StringBuilder();
- StringBuilder fileList = new StringBuilder();
-
- for (String fileName : fNameList) {
- Path p = new Path(fileName);
- if (fs.getFileStatus(p).isDirectory()) {
- appendTmpDir(job, fileName);
- continue;
- }
-
- StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList;
- if (list.length() > 0)
- list.append(",");
- list.append(fs.getFileStatus(p).getPath().toString());
- }
-
- appendTmpFiles(fileList.toString(), jobConf);
- appendTmpJars(jarList.toString(), jobConf);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private void appendTmpDir(Job job, String tmpDir) {
- if (StringUtils.isBlank(tmpDir))
- return;
-
- try {
- Configuration jobConf = job.getConfiguration();
- FileSystem fs = FileSystem.getLocal(jobConf);
- FileStatus[] fList = fs.listStatus(new Path(tmpDir));
-
- StringBuilder jarList = new StringBuilder();
- StringBuilder fileList = new StringBuilder();
-
- for (FileStatus file : fList) {
- Path p = file.getPath();
- if (fs.getFileStatus(p).isDirectory()) {
- appendTmpDir(job, p.toString());
- continue;
- }
-
- StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList;
- if (list.length() > 0)
- list.append(",");
- list.append(fs.getFileStatus(p).getPath().toString());
- }
-
- appendTmpFiles(fileList.toString(), jobConf);
- appendTmpJars(jarList.toString(), jobConf);
-
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private void appendTmpJars(String jarList, Configuration conf) {
- if (StringUtils.isBlank(jarList))
- return;
-
- String tmpJars = conf.get("tmpjars", null);
- if (tmpJars == null) {
- tmpJars = jarList;
- } else {
- tmpJars += "," + jarList;
- }
- conf.set("tmpjars", tmpJars);
- logger.info("Job 'tmpjars' updated -- " + tmpJars);
- }
-
- private void appendTmpFiles(String fileList, Configuration conf) {
- if (StringUtils.isBlank(fileList))
- return;
-
- String tmpFiles = conf.get("tmpfiles", null);
- if (tmpFiles == null) {
- tmpFiles = fileList;
- } else {
- tmpFiles += "," + fileList;
- }
- conf.set("tmpfiles", tmpFiles);
- logger.info("Job 'tmpfiles' updated -- " + tmpFiles);
- }
-
- private String getDefaultMapRedClasspath() {
-
- String classpath = "";
- try {
- CliCommandExecutor executor = KylinConfig.getInstanceFromEnv().getCliCommandExecutor();
- ShellCmdOutput output = new ShellCmdOutput();
- executor.execute("mapred classpath", output);
-
- classpath = output.getOutput().trim();
- } catch (IOException e) {
- logger.error("Failed to run: 'mapred classpath'.", e);
- }
-
- return classpath;
- }
-
- public void addInputDirs(String input, Job job) throws IOException {
- for (String inp : StringSplitter.split(input, ",")) {
- inp = inp.trim();
- if (inp.endsWith("/*")) {
- inp = inp.substring(0, inp.length() - 2);
- FileSystem fs = FileSystem.get(job.getConfiguration());
- Path path = new Path(inp);
- FileStatus[] fileStatuses = fs.listStatus(path);
- boolean hasDir = false;
- for (FileStatus stat : fileStatuses) {
- if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
- hasDir = true;
- addInputDirs(stat.getPath().toString(), job);
- }
- }
- if (fileStatuses.length > 0 && !hasDir) {
- addInputDirs(path.toString(), job);
- }
- } else {
- logger.debug("Add input " + inp);
- FileInputFormat.addInputPath(job, new Path(inp));
- }
- }
- }
-
- protected void attachKylinPropsAndMetadata(CubeInstance cube, Configuration conf) throws IOException {
- File tmp = File.createTempFile("kylin_job_meta", "");
- FileUtils.forceDelete(tmp);
-
- File metaDir = new File(tmp, "meta");
- metaDir.mkdirs();
-
- // write kylin.properties
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- File kylinPropsFile = new File(metaDir, "kylin.properties");
- kylinConfig.writeProperties(kylinPropsFile);
-
- // write cube / model_desc / cube_desc / dict / table
- ArrayList<String> dumpList = new ArrayList<String>();
- dumpList.add(cube.getResourcePath());
- dumpList.add(cube.getDescriptor().getModel().getResourcePath());
- dumpList.add(cube.getDescriptor().getResourcePath());
- for (String tableName : cube.getDescriptor().getModel().getAllTables()) {
- TableDesc table = MetadataManager.getInstance(kylinConfig).getTableDesc(tableName);
- dumpList.add(table.getResourcePath());
- }
-
- for (CubeSegment segment : cube.getSegments()) {
- dumpList.addAll(segment.getDictionaryPaths());
- }
-
- dumpResources(kylinConfig, metaDir, dumpList);
- addToHadoopDistCache(conf, metaDir);
- }
-
- protected void attachKylinPropsAndMetadata(IIInstance ii, Configuration conf) throws IOException {
- File tmp = File.createTempFile("kylin_job_meta", "");
- FileUtils.forceDelete(tmp);
-
- File metaDir = new File(tmp, "meta");
- metaDir.mkdirs();
-
- // write kylin.properties
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- File kylinPropsFile = new File(metaDir, "kylin.properties");
- kylinConfig.writeProperties(kylinPropsFile);
-
- // write II / model_desc / II_desc / dict / table
- ArrayList<String> dumpList = new ArrayList<String>();
- dumpList.add(ii.getResourcePath());
- dumpList.add(ii.getDescriptor().getModel().getResourcePath());
- dumpList.add(ii.getDescriptor().getResourcePath());
-
- for (String tableName : ii.getDescriptor().getModel().getAllTables()) {
- TableDesc table = MetadataManager.getInstance(kylinConfig).getTableDesc(tableName);
- dumpList.add(table.getResourcePath());
- }
-
- for (IISegment segment : ii.getSegments()) {
- dumpList.addAll(segment.getDictionaryPaths());
- }
-
- dumpResources(kylinConfig, metaDir, dumpList);
- addToHadoopDistCache(conf, metaDir);
- }
-
- private void addToHadoopDistCache(Configuration conf, File metaDir) {
- // hadoop distributed cache
- String hdfsMetaDir = OptionsHelper.convertToFileURL(metaDir.getAbsolutePath());
- if (hdfsMetaDir.startsWith("/")) // note Path on windows is like "d:/../..."
- hdfsMetaDir = "file://" + hdfsMetaDir;
- else
- hdfsMetaDir = "file:///" + hdfsMetaDir;
- logger.info("HDFS meta dir is: " + hdfsMetaDir);
-
- appendTmpFiles(hdfsMetaDir, conf);
- }
-
- private void dumpResources(KylinConfig kylinConfig, File metaDir, ArrayList<String> dumpList) throws IOException {
- ResourceStore from = ResourceStore.getStore(kylinConfig);
- KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
- ResourceStore to = ResourceStore.getStore(localConfig);
- for (String path : dumpList) {
- RawResource res = from.getResource(path);
- if (res == null)
- throw new IllegalStateException("No resource found at -- " + path);
- to.putResource(path, res.inputStream, res.timestamp);
- res.inputStream.close();
- }
- }
-
- protected void deletePath(Configuration conf, Path path) throws IOException {
- FileSystem fs = FileSystem.get(path.toUri(), conf);
- if (fs.exists(path)) {
- fs.delete(path, true);
- }
- }
-
- protected double getTotalMapInputMB() throws ClassNotFoundException, IOException, InterruptedException, JobException {
- if (job == null) {
- throw new JobException("Job is null");
- }
-
- long mapInputBytes = 0;
- InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
- for (InputSplit split : input.getSplits(job)) {
- mapInputBytes += split.getLength();
- }
- if (mapInputBytes == 0) {
- throw new IllegalArgumentException("Map input splits are 0 bytes, something is wrong!");
- }
- double totalMapInputMB = (double) mapInputBytes / 1024 / 1024;
- return totalMapInputMB;
- }
-
- protected int getMapInputSplitCount() throws ClassNotFoundException, JobException, IOException, InterruptedException {
- if (job == null) {
- throw new JobException("Job is null");
- }
- InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
- return input.getSplits(job).size();
- }
-
- public static KylinConfig loadKylinPropsAndMetadata(Configuration conf) throws IOException {
- File metaDir = new File("meta");
- System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath());
- logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath());
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- kylinConfig.setMetadataUrl(metaDir.getAbsolutePath());
- return kylinConfig;
- }
-
- protected void cleanupTempConfFile(Configuration conf) {
- String tempMetaFileString = conf.get("tmpfiles");
- logger.info("tempMetaFileString is : " + tempMetaFileString);
- if (tempMetaFileString != null) {
- if (tempMetaFileString.startsWith("file://")) {
- tempMetaFileString = tempMetaFileString.substring("file://".length());
- File tempMetaFile = new File(tempMetaFileString);
- if (tempMetaFile.exists()) {
- try {
- FileUtils.forceDelete(tempMetaFile.getParentFile());
-
- } catch (IOException e) {
- logger.warn("error when deleting " + tempMetaFile, e);
- }
- } else {
- logger.info("" + tempMetaFileString + " does not exist");
- }
- } else {
- logger.info("tempMetaFileString is not starting with file:// :" + tempMetaFileString);
- }
- }
- }
-
- public void kill() throws JobException {
- if (job != null) {
- try {
- job.killJob();
- } catch (IOException e) {
- throw new JobException(e);
- }
- }
- }
-
- public Map<String, String> getInfo() throws JobException {
- if (job != null) {
- Map<String, String> status = new HashMap<String, String>();
- if (null != job.getJobID()) {
- status.put(JobInstance.MR_JOB_ID, job.getJobID().toString());
- }
- if (null != job.getTrackingURL()) {
- status.put(JobInstance.YARN_APP_URL, job.getTrackingURL().toString());
- }
-
- return status;
- } else {
- throw new JobException("Job is null");
- }
- }
-
- public Counters getCounters() throws JobException {
- if (job != null) {
- try {
- return job.getCounters();
- } catch (IOException e) {
- throw new JobException(e);
- }
- } else {
- throw new JobException("Job is null");
- }
- }
-
- public void setAsync(boolean isAsync) {
- this.isAsync = isAsync;
- }
-
- public Job getJob() {
- return this.job;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
deleted file mode 100644
index 787181c..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cardinality;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.mr.KylinMapper;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.kv.RowConstants;
-
-/**
- * @author Jack
- *
- */
-public class ColumnCardinalityMapper<T> extends KylinMapper<T, HCatRecord, IntWritable, BytesWritable> {
-
- private Map<Integer, HyperLogLogPlusCounter> hllcMap = new HashMap<Integer, HyperLogLogPlusCounter>();
- public static final String DEFAULT_DELIM = ",";
-
- private int counter = 0;
-
- private HCatSchema schema = null;
- private int columnSize = 0;
-
- @Override
- protected void setup(Context context) throws IOException {
- super.publishConfiguration(context.getConfiguration());
- schema = HCatInputFormat.getTableSchema(context.getConfiguration());
- columnSize = schema.getFields().size();
- }
-
- @Override
- public void map(T key, HCatRecord value, Context context) throws IOException, InterruptedException {
-
- HCatFieldSchema field;
- Object fieldValue;
- for (int m = 0; m < columnSize; m++) {
- field = schema.get(m);
- fieldValue = value.get(field.getName(), schema);
- if (fieldValue == null)
- fieldValue = "NULL";
-
- if (counter < 5 && m < 10) {
- System.out.println("Get row " + counter + " column '" + field.getName() + "' value: " + fieldValue);
- }
-
- if (fieldValue != null)
- getHllc(m).add(Bytes.toBytes(fieldValue.toString()));
- }
-
- counter++;
- }
-
- private HyperLogLogPlusCounter getHllc(Integer key) {
- if (!hllcMap.containsKey(key)) {
- hllcMap.put(key, new HyperLogLogPlusCounter());
- }
- return hllcMap.get(key);
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- Iterator<Integer> it = hllcMap.keySet().iterator();
- while (it.hasNext()) {
- int key = it.next();
- HyperLogLogPlusCounter hllc = hllcMap.get(key);
- ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- buf.clear();
- hllc.writeRegisters(buf);
- buf.flip();
- context.write(new IntWritable(key), new BytesWritable(buf.array(), buf.limit()));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java
deleted file mode 100644
index ab4285a..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cardinality;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.mr.KylinReducer;
-import org.apache.kylin.cube.kv.RowConstants;
-
-/**
- * @author Jack
- *
- */
-public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWritable, IntWritable, LongWritable> {
-
- public static final int ONE = 1;
- private Map<Integer, HyperLogLogPlusCounter> hllcMap = new HashMap<Integer, HyperLogLogPlusCounter>();
-
- @Override
- protected void setup(Context context) throws IOException {
- super.publishConfiguration(context.getConfiguration());
- }
-
- @Override
- public void reduce(IntWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
- int skey = key.get();
- for (BytesWritable v : values) {
- ByteBuffer buffer = ByteBuffer.wrap(v.getBytes());
- HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter();
- hll.readRegisters(buffer);
- getHllc(skey).merge(hll);
- hll.clear();
- }
- }
-
- private HyperLogLogPlusCounter getHllc(Integer key) {
- if (!hllcMap.containsKey(key)) {
- hllcMap.put(key, new HyperLogLogPlusCounter());
- }
- return hllcMap.get(key);
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- List<Integer> keys = new ArrayList<Integer>();
- Iterator<Integer> it = hllcMap.keySet().iterator();
- while (it.hasNext()) {
- keys.add(it.next());
- }
- Collections.sort(keys);
- it = keys.iterator();
- while (it.hasNext()) {
- int key = it.next();
- HyperLogLogPlusCounter hllc = hllcMap.get(key);
- ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- buf.clear();
- hllc.writeRegisters(buf);
- buf.flip();
- context.write(new IntWritable(key), new LongWritable(hllc.getCountEstimate()));
- // context.write(new Text("ErrorRate_" + key), new
- // LongWritable((long)hllc.getErrorRate()));
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
deleted file mode 100644
index f27d074..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cardinality;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-
-/**
- * This hadoop job will scan all rows of the hive table and then calculate the cardinality on each column.
- * @author shaoshi
- *
- */
-public class HiveColumnCardinalityJob extends AbstractHadoopJob {
- public static final String JOB_TITLE = "Kylin Hive Column Cardinality Job";
-
- @SuppressWarnings("static-access")
- protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true).withDescription("The hive table name").create("table");
-
- public static final String OUTPUT_PATH = BatchConstants.CFG_KYLIN_HDFS_TEMP_DIR + "cardinality";
-
- public HiveColumnCardinalityJob() {
- }
-
- @Override
- public int run(String[] args) throws Exception {
-
- Options options = new Options();
-
- try {
- options.addOption(OPTION_TABLE);
- options.addOption(OPTION_OUTPUT_PATH);
-
- parseOptions(options, args);
-
- // start job
- String jobName = JOB_TITLE + getOptionsAsString();
- System.out.println("Starting: " + jobName);
- Configuration conf = getConf();
-
- JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
- conf.addResource(jobEngineConfig.getHadoopJobConfFilePath(null));
-
- job = Job.getInstance(conf, jobName);
-
- setJobClasspath(job);
-
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- FileOutputFormat.setOutputPath(job, output);
- job.getConfiguration().set("dfs.block.size", "67108864");
-
- // Mapper
- String table = getOptionValue(OPTION_TABLE);
- String[] dbTableNames = HadoopUtil.parseHiveTableName(table);
- HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]);
-
- job.setInputFormatClass(HCatInputFormat.class);
- job.setMapperClass(ColumnCardinalityMapper.class);
- job.setMapOutputKeyClass(IntWritable.class);
- job.setMapOutputValueClass(BytesWritable.class);
-
- // Reducer - only one
- job.setReducerClass(ColumnCardinalityReducer.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(LongWritable.class);
- job.setNumReduceTasks(1);
-
- this.deletePath(job.getConfiguration(), output);
-
- System.out.println("Going to submit HiveColumnCardinalityJob for table '" + table + "'");
- int result = waitForCompletion(job);
-
- return result;
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityUpdateJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityUpdateJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityUpdateJob.java
deleted file mode 100644
index 7bd3814..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityUpdateJob.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cardinality;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.MetadataManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This job will update save the cardinality result into Kylin table metadata store.
- *
- * @author shaoshi
- */
-public class HiveColumnCardinalityUpdateJob extends AbstractHadoopJob {
- public static final String JOB_TITLE = "Kylin Hive Column Cardinality Update Job";
-
- @SuppressWarnings("static-access")
- protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true).withDescription("The hive table name").create("table");
-
- private static final Logger logger = LoggerFactory.getLogger(HiveColumnCardinalityUpdateJob.class);
- private String table;
-
- public HiveColumnCardinalityUpdateJob() {
-
- }
-
- @Override
- public int run(String[] args) throws Exception {
-
- Options options = new Options();
-
- try {
- options.addOption(OPTION_TABLE);
- options.addOption(OPTION_OUTPUT_PATH);
-
- parseOptions(options, args);
-
- this.table = getOptionValue(OPTION_TABLE).toUpperCase();
- // start job
- String jobName = JOB_TITLE + getOptionsAsString();
- logger.info("Starting: " + jobName);
- Configuration conf = getConf();
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-
- updateKylinTableExd(table.toUpperCase(), output.toString(), conf);
- return 0;
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
-
- }
-
- public void updateKylinTableExd(String tableName, String outPath, Configuration config) throws IOException {
- List<String> columns = null;
- try {
- columns = readLines(new Path(outPath), config);
- } catch (Exception e) {
- logger.error("Failed to resolve cardinality for " + tableName + " from " + outPath, e);
- return;
- }
-
- StringBuffer cardi = new StringBuffer();
- Iterator<String> it = columns.iterator();
- while (it.hasNext()) {
- String string = it.next();
- String[] ss = StringUtils.split(string, "\t");
-
- if (ss.length != 2) {
- logger.info("The hadoop cardinality value is not valid " + string);
- continue;
- }
- cardi.append(ss[1]);
- cardi.append(",");
- }
- String scardi = cardi.toString();
- if (scardi.length() > 0) {
- scardi = scardi.substring(0, scardi.length() - 1);
- MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
- Map<String, String> tableExd = metaMgr.getTableDescExd(tableName);
- tableExd.put(MetadataConstants.TABLE_EXD_CARDINALITY, scardi);
- metaMgr.saveTableExd(tableName.toUpperCase(), tableExd);
- } else {
- throw new IllegalArgumentException("No cardinality data is collected for table " + tableName);
- }
- }
-
- private static List<String> readLines(Path location, Configuration conf) throws Exception {
- FileSystem fileSystem = FileSystem.get(location.toUri(), conf);
- CompressionCodecFactory factory = new CompressionCodecFactory(conf);
- FileStatus[] items = fileSystem.listStatus(location);
- if (items == null)
- return new ArrayList<String>();
- List<String> results = new ArrayList<String>();
- for (FileStatus item : items) {
-
- // ignoring files like _SUCCESS
- if (item.getPath().getName().startsWith("_")) {
- continue;
- }
-
- CompressionCodec codec = factory.getCodec(item.getPath());
- InputStream stream = null;
-
- // check if we have a compression codec we need to use
- if (codec != null) {
- stream = codec.createInputStream(fileSystem.open(item.getPath()));
- } else {
- stream = fileSystem.open(item.getPath());
- }
-
- StringWriter writer = new StringWriter();
- IOUtils.copy(stream, writer, "UTF-8");
- String raw = writer.toString();
- for (String str : raw.split("\n")) {
- results.add(str);
- }
- }
- return results;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java
deleted file mode 100644
index b600213..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * @author honma
- *
- */
-
-public class BaseCuboidJob extends CuboidJob {
- public BaseCuboidJob() {
- this.setMapperClass(BaseCuboidMapper.class);
- }
-
- public static void main(String[] args) throws Exception {
- CuboidJob job = new BaseCuboidJob();
- int exitCode = ToolRunner.run(job, args);
- System.exit(exitCode);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
deleted file mode 100644
index d06963b..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.mr.KylinMapper;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesSplitter;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.hadoop.hive.CubeJoinedFlatTableDesc;
-import org.apache.kylin.measure.MeasureCodec;
-import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- * @author George Song (ysong1)
- */
-public class BaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, Text> {
-
- private static final Logger logger = LoggerFactory.getLogger(BaseCuboidMapper.class);
-
- public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
- public static final byte[] ONE = Bytes.toBytes("1");
-
- private String cubeName;
- private String segmentName;
- private Cuboid baseCuboid;
- private CubeInstance cube;
- private CubeDesc cubeDesc;
- private CubeSegment cubeSegment;
- private List<byte[]> nullBytes;
-
- private CubeJoinedFlatTableDesc intermediateTableDesc;
- private String intermediateTableRowDelimiter;
- private byte byteRowDelimiter;
-
- private int counter;
- private int errorRecordCounter;
- private Text outputKey = new Text();
- private Text outputValue = new Text();
- protected MeasureIngester<?>[] aggrIngesters;
- protected Map<TblColRef, Dictionary<String>> dictionaryMap;
- private Object[] measures;
- private byte[][] keyBytesBuf;
- private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
- private BytesSplitter bytesSplitter;
- private AbstractRowKeyEncoder rowKeyEncoder;
- private MeasureCodec measureCodec;
-
- @Override
- protected void setup(Context context) throws IOException {
- super.publishConfiguration(context.getConfiguration());
-
- cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
- segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
- intermediateTableRowDelimiter = context.getConfiguration().get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER));
- if (Bytes.toBytes(intermediateTableRowDelimiter).length > 1) {
- throw new RuntimeException("Expected delimiter byte length is 1, but got " + Bytes.toBytes(intermediateTableRowDelimiter).length);
- }
-
- byteRowDelimiter = Bytes.toBytes(intermediateTableRowDelimiter)[0];
-
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
-
- cube = CubeManager.getInstance(config).getCube(cubeName);
- cubeDesc = cube.getDescriptor();
- cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-
- long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-
- intermediateTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), cubeSegment);
-
- bytesSplitter = new BytesSplitter(200, 16384);
- rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
-
- measureCodec = new MeasureCodec(cubeDesc.getMeasures());
- measures = new Object[cubeDesc.getMeasures().size()];
-
- int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
- keyBytesBuf = new byte[colCount][];
-
- aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
- dictionaryMap = cubeSegment.buildDictionaryMap();
-
- initNullBytes();
- }
-
- private void initNullBytes() {
- nullBytes = Lists.newArrayList();
- nullBytes.add(HIVE_NULL);
- String[] nullStrings = cubeDesc.getNullStrings();
- if (nullStrings != null) {
- for (String s : nullStrings) {
- nullBytes.add(Bytes.toBytes(s));
- }
- }
- }
-
- private boolean isNull(byte[] v) {
- for (byte[] nullByte : nullBytes) {
- if (Bytes.equals(v, nullByte))
- return true;
- }
- return false;
- }
-
- private byte[] buildKey(SplittedBytes[] splitBuffers) {
- int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
- for (int i = 0; i < baseCuboid.getColumns().size(); i++) {
- int index = rowKeyColumnIndexes[i];
- keyBytesBuf[i] = Arrays.copyOf(splitBuffers[index].value, splitBuffers[index].length);
- if (isNull(keyBytesBuf[i])) {
- keyBytesBuf[i] = null;
- }
- }
- return rowKeyEncoder.encode(keyBytesBuf);
- }
-
- private void buildValue(SplittedBytes[] splitBuffers) {
-
- for (int i = 0; i < measures.length; i++) {
- measures[i] = buildValueOf(i, splitBuffers);
- }
-
- valueBuf.clear();
- measureCodec.encode(measures, valueBuf);
- }
-
- private Object buildValueOf(int idxOfMeasure, SplittedBytes[] splitBuffers) {
- MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure);
- FunctionDesc function = measure.getFunction();
- int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure];
-
- int paramCount = function.getParameterCount();
- String[] inputToMeasure = new String[paramCount];
-
- // pick up parameter values
- ParameterDesc param = function.getParameter();
- int colParamIdx = 0; // index among parameters of column type
- for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
- String value;
- if (function.isCount()) {
- value = "1";
- } else if (param.isColumnType()) {
- value = getCell(colIdxOnFlatTable[colParamIdx++], splitBuffers);
- } else {
- value = param.getValue();
- }
- inputToMeasure[i] = value;
- }
-
- return aggrIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap);
- }
-
- private String getCell(int i, SplittedBytes[] splitBuffers) {
- byte[] bytes = Arrays.copyOf(splitBuffers[i].value, splitBuffers[i].length);
- if (isNull(bytes))
- return null;
- else
- return Bytes.toString(bytes);
- }
-
- @Override
- public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
- counter++;
- if (counter % BatchConstants.COUNTER_MAX == 0) {
- logger.info("Handled " + counter + " records!");
- }
-
- try {
- bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter);
- intermediateTableDesc.sanityCheck(bytesSplitter);
-
- byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers());
- outputKey.set(rowKey, 0, rowKey.length);
-
- buildValue(bytesSplitter.getSplitBuffers());
- outputValue.set(valueBuf.array(), 0, valueBuf.position());
-
- context.write(outputKey, outputValue);
- } catch (Exception ex) {
- handleErrorRecord(bytesSplitter, ex);
- }
- }
-
- private void handleErrorRecord(BytesSplitter bytesSplitter, Exception ex) throws IOException {
-
- ex.printStackTrace(System.err);
- System.err.println("Insane record: " + bytesSplitter);
-
- errorRecordCounter++;
- if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) {
- if (ex instanceof IOException)
- throw (IOException) ex;
- else if (ex instanceof RuntimeException)
- throw (RuntimeException) ex;
- else
- throw new RuntimeException("", ex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
deleted file mode 100644
index 3c1e4a5..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author George Song (ysong1)
- */
-public class CubeHFileJob extends AbstractHadoopJob {
-
- protected static final Logger logger = LoggerFactory.getLogger(CubeHFileJob.class);
-
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_HTABLE_NAME);
- parseOptions(options, args);
-
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-
- CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-
- CubeInstance cube = cubeMgr.getCube(cubeName);
- job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
-
- setJobClasspath(job);
-
- addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
- FileOutputFormat.setOutputPath(job, output);
-
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setMapperClass(CubeHFileMapper.class);
- job.setReducerClass(KeyValueSortReducer.class);
-
- // set job configuration
- job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- Configuration conf = HBaseConfiguration.create(getConf());
- // add metadata to distributed cache
- attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
- String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
- HTable htable = new HTable(conf, tableName);
-
- //Automatic config !
- HFileOutputFormat.configureIncrementalLoad(job, htable);
-
- // set block replication to 3 for hfiles
- conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
-
- this.deletePath(job.getConfiguration(), output);
-
- return waitForCompletion(job);
- } catch (Exception e) {
- logger.error("error in CubeHFileJob", e);
- printUsage(options);
- throw e;
- } finally {
- if (job != null)
- cleanupTempConfFile(job.getConfiguration());
- }
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new CubeHFileJob(), args);
- System.exit(exitCode);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
deleted file mode 100644
index f12d229..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.mr.KylinMapper;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.MeasureDesc;
-
-import com.google.common.collect.Lists;
-
-/**
- * @author George Song (ysong1)
- *
- */
-public class CubeHFileMapper extends KylinMapper<Text, Text, ImmutableBytesWritable, KeyValue> {
-
- ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
-
- String cubeName;
- CubeDesc cubeDesc;
-
- MeasureCodec inputCodec;
- Object[] inputMeasures;
- List<KeyValueCreator> keyValueCreators;
-
- @Override
- protected void setup(Context context) throws IOException {
- super.publishConfiguration(context.getConfiguration());
- cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
-
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
-
- CubeManager cubeMgr = CubeManager.getInstance(config);
- cubeDesc = cubeMgr.getCube(cubeName).getDescriptor();
-
- inputCodec = new MeasureCodec(cubeDesc.getMeasures());
- inputMeasures = new Object[cubeDesc.getMeasures().size()];
- keyValueCreators = Lists.newArrayList();
-
- for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
- keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
- }
- }
- }
-
- @Override
- public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
- outputKey.set(key.getBytes(), 0, key.getLength());
- KeyValue outputValue;
-
- int n = keyValueCreators.size();
- if (n == 1 && keyValueCreators.get(0).isFullCopy) { // shortcut for
- // simple full copy
-
- outputValue = keyValueCreators.get(0).create(key, value.getBytes(), 0, value.getLength());
- context.write(outputKey, outputValue);
-
- } else { // normal (complex) case that distributes measures to multiple
- // HBase columns
-
- inputCodec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), inputMeasures);
-
- for (int i = 0; i < n; i++) {
- outputValue = keyValueCreators.get(i).create(key, inputMeasures);
- context.write(outputKey, outputValue);
- }
- }
- }
-
- class KeyValueCreator {
- byte[] cfBytes;
- byte[] qBytes;
- long timestamp;
-
- int[] refIndex;
- MeasureDesc[] refMeasures;
-
- MeasureCodec codec;
- Object[] colValues;
- ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
- boolean isFullCopy;
-
- public KeyValueCreator(CubeDesc cubeDesc, HBaseColumnDesc colDesc) {
-
- cfBytes = Bytes.toBytes(colDesc.getColumnFamilyName());
- qBytes = Bytes.toBytes(colDesc.getQualifier());
- timestamp = 0; // use 0 for timestamp
-
- List<MeasureDesc> measures = cubeDesc.getMeasures();
- String[] measureNames = getMeasureNames(cubeDesc);
- String[] refs = colDesc.getMeasureRefs();
-
- refIndex = new int[refs.length];
- refMeasures = new MeasureDesc[refs.length];
- for (int i = 0; i < refs.length; i++) {
- refIndex[i] = indexOf(measureNames, refs[i]);
- refMeasures[i] = measures.get(refIndex[i]);
- }
-
- codec = new MeasureCodec(refMeasures);
- colValues = new Object[refs.length];
-
- isFullCopy = true;
- for (int i = 0; i < measures.size(); i++) {
- if (refIndex.length <= i || refIndex[i] != i)
- isFullCopy = false;
- }
- }
-
- public KeyValue create(Text key, Object[] measureValues) {
- for (int i = 0; i < colValues.length; i++) {
- colValues[i] = measureValues[refIndex[i]];
- }
-
- valueBuf.clear();
- codec.encode(colValues, valueBuf);
-
- return create(key, valueBuf.array(), 0, valueBuf.position());
- }
-
- public KeyValue create(Text key, byte[] value, int voffset, int vlen) {
- return new KeyValue(key.getBytes(), 0, key.getLength(), //
- cfBytes, 0, cfBytes.length, //
- qBytes, 0, qBytes.length, //
- timestamp, Type.Put, //
- value, voffset, vlen);
- }
-
- private int indexOf(String[] measureNames, String ref) {
- for (int i = 0; i < measureNames.length; i++)
- if (measureNames[i].equalsIgnoreCase(ref))
- return i;
-
- throw new IllegalArgumentException("Measure '" + ref + "' not found in " + Arrays.toString(measureNames));
- }
-
- private String[] getMeasureNames(CubeDesc cubeDesc) {
- List<MeasureDesc> measures = cubeDesc.getMeasures();
- String[] result = new String[measures.size()];
- for (int i = 0; i < measures.size(); i++)
- result[i] = measures.get(i).getName();
- return result;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
deleted file mode 100644
index e4875e9..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.cuboid.CuboidCLI;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.exception.JobException;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author ysong1
- */
-public class CuboidJob extends AbstractHadoopJob {
-
- protected static final Logger logger = LoggerFactory.getLogger(CuboidJob.class);
- private static final String MAPRED_REDUCE_TASKS = "mapred.reduce.tasks";
-
- @SuppressWarnings("rawtypes")
- private Class<? extends Mapper> mapperClass;
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_SEGMENT_NAME);
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_NCUBOID_LEVEL);
- options.addOption(OPTION_INPUT_FORMAT);
- parseOptions(options, args);
-
- Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
- int nCuboidLevel = Integer.parseInt(getOptionValue(OPTION_NCUBOID_LEVEL));
- String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
-
- KylinConfig config = KylinConfig.getInstanceFromEnv();
- CubeManager cubeMgr = CubeManager.getInstance(config);
- CubeInstance cube = cubeMgr.getCube(cubeName);
-
- job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
- logger.info("Starting: " + job.getJobName());
- FileInputFormat.setInputPaths(job, input);
-
- setJobClasspath(job);
-
- // Mapper
- if (this.mapperClass == null) {
- throw new Exception("Mapper class is not set!");
- }
-
- boolean isInputTextFormat = false;
- if (hasOption(OPTION_INPUT_FORMAT) && ("textinputformat".equalsIgnoreCase(getOptionValue(OPTION_INPUT_FORMAT)))) {
- isInputTextFormat = true;
- }
-
- if (isInputTextFormat) {
- job.setInputFormatClass(TextInputFormat.class);
-
- } else {
- job.setInputFormatClass(SequenceFileInputFormat.class);
- }
- job.setMapperClass(this.mapperClass);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- job.setCombinerClass(CuboidReducer.class); // for base cuboid shuffle skew, some rowkey aggregates far more records than others
-
- // Reducer
- job.setReducerClass(CuboidReducer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
-
- FileOutputFormat.setOutputPath(job, output);
-
- // set job configuration
- job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
- job.getConfiguration().setInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, nCuboidLevel);
-
- // add metadata to distributed cache
- attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
- setReduceTaskNum(job, config, cubeName, nCuboidLevel);
-
- this.deletePath(job.getConfiguration(), output);
-
- return waitForCompletion(job);
- } catch (Exception e) {
- logger.error("error in CuboidJob", e);
- printUsage(options);
- throw e;
- } finally {
- if (job != null)
- cleanupTempConfFile(job.getConfiguration());
- }
- }
-
- protected void setReduceTaskNum(Job job, KylinConfig config, String cubeName, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException {
- Configuration jobConf = job.getConfiguration();
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-
- CubeDesc cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor();
-
- double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
- double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
-
- // total map input MB
- double totalMapInputMB = this.getTotalMapInputMB();
-
- // output / input ratio
- int preLevelCuboids, thisLevelCuboids;
- if (level == 0) { // base cuboid
- preLevelCuboids = thisLevelCuboids = 1;
- } else { // n-cuboid
- int[] allLevelCount = CuboidCLI.calculateAllLevelCount(cubeDesc);
- preLevelCuboids = allLevelCount[level - 1];
- thisLevelCuboids = allLevelCount[level];
- }
-
- // total reduce input MB
- double totalReduceInputMB = totalMapInputMB * thisLevelCuboids / preLevelCuboids;
-
- // number of reduce tasks
- int numReduceTasks = (int) Math.round(totalReduceInputMB / perReduceInputMB * reduceCountRatio);
-
- // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
- if (cubeDesc.hasMemoryHungryMeasures()) {
- numReduceTasks = numReduceTasks * 4;
- }
-
- // at least 1 reducer
- numReduceTasks = Math.max(1, numReduceTasks);
- // no more than 5000 reducer by default
- numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
-
- jobConf.setInt(MAPRED_REDUCE_TASKS, numReduceTasks);
-
- logger.info("Having total map input MB " + Math.round(totalMapInputMB));
- logger.info("Having level " + level + ", pre-level cuboids " + preLevelCuboids + ", this level cuboids " + thisLevelCuboids);
- logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio);
- logger.info("Setting " + MAPRED_REDUCE_TASKS + "=" + numReduceTasks);
- }
-
- /**
- * @param mapperClass
- * the mapperClass to set
- */
- @SuppressWarnings("rawtypes")
- public void setMapperClass(Class<? extends Mapper> mapperClass) {
- this.mapperClass = mapperClass;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
deleted file mode 100644
index 3859d0e..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.mr.KylinReducer;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.measure.MeasureAggregators;
-import org.apache.kylin.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author George Song (ysong1)
- *
- */
-public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
-
- private static final Logger logger = LoggerFactory.getLogger(CuboidReducer.class);
-
- private String cubeName;
- private CubeDesc cubeDesc;
- private List<MeasureDesc> measuresDescs;
-
- private MeasureCodec codec;
- private MeasureAggregators aggs;
-
- private int counter;
- private int cuboidLevel;
- private boolean[] needAggr;
- private Object[] input;
- private Object[] result;
-
- private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- private Text outputValue = new Text();
-
- @Override
- protected void setup(Context context) throws IOException {
- super.publishConfiguration(context.getConfiguration());
- cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
- // only used in Build job, not in Merge job
- cuboidLevel = context.getConfiguration().getInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, 0);
-
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
-
- cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor();
- measuresDescs = cubeDesc.getMeasures();
-
- codec = new MeasureCodec(measuresDescs);
- aggs = new MeasureAggregators(measuresDescs);
-
- input = new Object[measuresDescs.size()];
- result = new Object[measuresDescs.size()];
- needAggr = new boolean[measuresDescs.size()];
-
- if (cuboidLevel > 0) {
- for (int i = 0; i < measuresDescs.size(); i++) {
- needAggr[i] = !measuresDescs.get(i).getFunction().getMeasureType().onlyAggrInBaseCuboid();
- }
- }
- }
-
- @Override
- public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-
- aggs.reset();
-
- for (Text value : values) {
- codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input);
- if (cuboidLevel > 0) {
- aggs.aggregate(input, needAggr);
- } else {
- aggs.aggregate(input);
- }
- }
- aggs.collectStates(result);
-
- valueBuf.clear();
- codec.encode(result, valueBuf);
-
- outputValue.set(valueBuf.array(), 0, valueBuf.position());
- context.write(key, outputValue);
-
- counter++;
- if (counter % BatchConstants.COUNTER_MAX == 0) {
- logger.info("Handled " + counter + " records!");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java
deleted file mode 100644
index 9792463..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.HashSet;
-
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.mr.KylinReducer;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-
-/**
- * @author yangli9
- */
-public class FactDistinctColumnsCombiner extends KylinReducer<ShortWritable, Text, ShortWritable, Text> {
-
- private Text outputValue = new Text();
-
- @Override
- protected void setup(Context context) throws IOException {
- super.publishConfiguration(context.getConfiguration());
- }
-
- @Override
- public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-
- HashSet<ByteArray> set = new HashSet<ByteArray>();
- for (Text textValue : values) {
- ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
- set.add(value);
- }
-
- for (ByteArray value : set) {
- outputValue.set(value.data);
- context.write(key, outputValue);
- }
- }
-
-}