You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/16 05:05:19 UTC
[5/5] incubator-kylin git commit: KYLIN-1010 Job module with only II
and tests left
KYLIN-1010 Job module with only II and tests left
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/4456bb1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/4456bb1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/4456bb1d
Branch: refs/heads/2.x-staging
Commit: 4456bb1ddcf249221a1cb1df8186dbbe26e39b4d
Parents: d1f0d33
Author: Li, Yang <ya...@ebay.com>
Authored: Tue Sep 15 19:08:15 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Wed Sep 16 11:04:24 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/job/cmd/BaseCommandOutput.java | 29 --
.../apache/kylin/job/cmd/ICommandOutput.java | 44 --
.../org/apache/kylin/job/cmd/IJobCommand.java | 32 --
.../java/org/apache/kylin/job/cmd/ShellCmd.java | 108 -----
.../apache/kylin/job/cmd/ShellCmdOutput.java | 83 ----
.../engine/mr/common/AbstractHadoopJob.java | 9 +-
.../cardinality/ColumnCardinalityMapper.java | 111 -----
.../cardinality/ColumnCardinalityReducer.java | 92 ----
.../cardinality/HiveColumnCardinalityJob.java | 107 -----
.../HiveColumnCardinalityUpdateJob.java | 154 -------
.../job/hadoop/cube/NewBaseCuboidMapper.java | 348 ---------------
.../job/hadoop/cube/OrphanHBaseCleanJob.java | 134 ------
.../job/hadoop/cube/StorageCleanupJob.java | 323 --------------
.../apache/kylin/job/monitor/MonitorCLI.java | 69 ---
.../kylin/job/monitor/StreamingMonitor.java | 154 -------
.../kylin/job/streaming/BootstrapConfig.java | 74 ----
.../kylin/job/streaming/CubeStreamConsumer.java | 141 ------
.../kylin/job/streaming/KafkaDataLoader.java | 54 ---
.../kylin/job/streaming/StreamingBootstrap.java | 402 -----------------
.../kylin/job/streaming/StreamingCLI.java | 104 -----
.../apache/kylin/job/tools/CleanHtableCLI.java | 73 ---
.../kylin/job/tools/CubeMigrationCLI.java | 439 -------------------
.../job/tools/GridTableHBaseBenchmark.java | 391 -----------------
.../kylin/job/tools/HbaseStreamingInput.java | 237 ----------
.../kylin/job/tools/HtableAlterMetadataCLI.java | 88 ----
.../org/apache/kylin/job/tools/KafkaVerify.java | 101 -----
.../apache/kylin/job/tools/RowCounterCLI.java | 71 ---
.../kylin/job/tools/StreamingInputAnalyzer.java | 242 ----------
.../kylin/job/tools/StreamingLogsAnalyser.java | 96 ----
.../org/apache/kylin/job/tools/TarGZUtil.java | 69 ---
.../apache/kylin/job/tools/TimeHistogram.java | 85 ----
.../kylin/job/BuildCubeWithStreamTest.java | 2 +-
.../apache/kylin/job/BuildIIWithEngineTest.java | 2 +-
.../job/tools/ColumnCardinalityReducerTest.java | 4 +-
.../apache/kylin/rest/service/AdminService.java | 2 +-
.../apache/kylin/rest/service/CubeService.java | 4 +-
.../apache/kylin/source/hive/HiveMRInput.java | 7 +-
.../cardinality/ColumnCardinalityMapper.java | 111 +++++
.../cardinality/ColumnCardinalityReducer.java | 92 ++++
.../cardinality/HiveColumnCardinalityJob.java | 107 +++++
.../HiveColumnCardinalityUpdateJob.java | 154 +++++++
.../kylin/source/kafka/util/KafkaVerify.java | 101 +++++
.../storage/hbase/steps/DeprecatedGCStep.java | 4 +-
.../storage/hbase/util/CleanHtableCLI.java | 73 +++
.../storage/hbase/util/CubeMigrationCLI.java | 439 +++++++++++++++++++
.../hbase/util/GridTableHBaseBenchmark.java | 391 +++++++++++++++++
.../storage/hbase/util/HbaseStreamingInput.java | 237 ++++++++++
.../hbase/util/HtableAlterMetadataCLI.java | 88 ++++
.../storage/hbase/util/OrphanHBaseCleanJob.java | 134 ++++++
.../kylin/storage/hbase/util/RowCounterCLI.java | 71 +++
.../storage/hbase/util/StorageCleanupJob.java | 315 +++++++++++++
.../kylin/storage/hbase/util/TarGZUtil.java | 69 +++
.../apache/kylin/job/monitor/MonitorCLI.java | 69 +++
.../kylin/job/monitor/StreamingMonitor.java | 154 +++++++
.../kylin/job/streaming/BootstrapConfig.java | 74 ++++
.../kylin/job/streaming/CubeStreamConsumer.java | 141 ++++++
.../kylin/job/streaming/KafkaDataLoader.java | 54 +++
.../kylin/job/streaming/StreamingBootstrap.java | 402 +++++++++++++++++
.../kylin/job/streaming/StreamingCLI.java | 104 +++++
.../streaming/util/StreamingInputAnalyzer.java | 242 ++++++++++
.../streaming/util/StreamingLogsAnalyser.java | 96 ++++
.../kylin/streaming/util/TimeHistogram.java | 85 ++++
62 files changed, 3816 insertions(+), 4476 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java b/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
deleted file mode 100644
index 29b5324..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-/**
- */
-public abstract class BaseCommandOutput implements ICommandOutput {
-
- @Override
- public void log(String message) {
- this.appendOutput(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java b/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
deleted file mode 100644
index 6cab6a3..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.common.util.Logger;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-
-/**
- * @author xjiang
- *
- */
-public interface ICommandOutput extends Logger {
-
- public void setStatus(JobStepStatusEnum status);
-
- public JobStepStatusEnum getStatus();
-
- public void appendOutput(String message);
-
- public String getOutput();
-
- public void setExitCode(int exitCode);
-
- public int getExitCode();
-
- public void reset();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java b/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
deleted file mode 100644
index 5a47173..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.job.exception.JobException;
-
-/**
- * @author xjiang
- *
- */
-public interface IJobCommand {
-
- public ICommandOutput execute() throws JobException;
-
- public void cancel() throws JobException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java b/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
deleted file mode 100644
index b93c058..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-
-import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.apache.kylin.job.exception.JobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xjiang
- *
- */
-public class ShellCmd implements IJobCommand {
-
- private static Logger logger = LoggerFactory.getLogger(ShellCmd.class);
-
- private final String executeCommand;
- private final ICommandOutput output;
- private final boolean isAsync;
- private final CliCommandExecutor cliCommandExecutor;
-
- private FutureTask<Integer> future;
-
- private ShellCmd(String executeCmd, ICommandOutput out, String host, int port, String user, String password, boolean async) {
- this.executeCommand = executeCmd;
- this.output = out;
- this.cliCommandExecutor = new CliCommandExecutor();
- this.cliCommandExecutor.setRunAtRemote(host, port, user, password);
- this.isAsync = async;
- }
-
- public ShellCmd(String executeCmd, String host, int port, String user, String password, boolean async) {
- this(executeCmd, new ShellCmdOutput(), host, port, user, password, async);
- }
-
- @Override
- public ICommandOutput execute() throws JobException {
-
- final ExecutorService executor = Executors.newSingleThreadExecutor();
- future = new FutureTask<Integer>(new Callable<Integer>() {
- public Integer call() throws JobException, IOException {
- executor.shutdown();
- return executeCommand(executeCommand);
- }
- });
- executor.execute(future);
-
- int exitCode = -1;
- if (!isAsync) {
- try {
- exitCode = future.get();
- logger.info("finish executing");
- } catch (CancellationException e) {
- logger.debug("Command is cancelled");
- exitCode = -2;
- } catch (Exception e) {
- throw new JobException("Error when execute job " + executeCommand, e);
- } finally {
- if (exitCode == 0) {
- output.setStatus(JobStepStatusEnum.FINISHED);
- } else if (exitCode == -2) {
- output.setStatus(JobStepStatusEnum.DISCARDED);
- } else {
- output.setStatus(JobStepStatusEnum.ERROR);
- }
- output.setExitCode(exitCode);
- }
- }
- return output;
- }
-
- protected int executeCommand(String command) throws JobException, IOException {
- output.reset();
- output.setStatus(JobStepStatusEnum.RUNNING);
- return cliCommandExecutor.execute(command, output).getFirst();
- }
-
- @Override
- public void cancel() throws JobException {
- future.cancel(true);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java b/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
deleted file mode 100644
index 44609c2..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xjiang
- *
- */
-public class ShellCmdOutput extends BaseCommandOutput implements ICommandOutput {
-
- protected static final Logger logger = LoggerFactory.getLogger(ShellCmdOutput.class);
-
- protected StringBuilder output;
- protected int exitCode;
- protected JobStepStatusEnum status;
-
- public ShellCmdOutput() {
- init();
- }
-
- private void init() {
- output = new StringBuilder();
- exitCode = -1;
- status = JobStepStatusEnum.NEW;
- }
-
- @Override
- public JobStepStatusEnum getStatus() {
- return status;
- }
-
- @Override
- public void setStatus(JobStepStatusEnum s) {
- this.status = s;
- }
-
- @Override
- public String getOutput() {
- return output.toString();
- }
-
- @Override
- public void appendOutput(String message) {
- output.append(message).append(System.getProperty("line.separator"));
- logger.debug(message);
- }
-
- @Override
- public int getExitCode() {
- return exitCode;
- }
-
- @Override
- public void setExitCode(int code) {
- exitCode = code;
- }
-
- @Override
- public void reset() {
- init();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index cd9393a..cf492ef 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -23,7 +23,7 @@ package org.apache.kylin.engine.mr.common;
*
*/
-import static org.apache.hadoop.util.StringUtils.formatTime;
+import static org.apache.hadoop.util.StringUtils.*;
import java.io.File;
import java.io.IOException;
@@ -58,7 +58,6 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.cmd.ShellCmdOutput;
import org.apache.kylin.job.common.OptionsHelper;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.metadata.MetadataManager;
@@ -189,10 +188,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
String classpath = "";
try {
CliCommandExecutor executor = KylinConfig.getInstanceFromEnv().getCliCommandExecutor();
- ShellCmdOutput output = new ShellCmdOutput();
- executor.execute("mapred classpath", output);
-
- classpath = output.getOutput().trim().replace(':', ',');
+ String output = executor.execute("mapred classpath").getSecond();
+ classpath = output.trim().replace(':', ',');
} catch (IOException e) {
logger.error("Failed to run: 'mapred classpath'.", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
deleted file mode 100644
index 9b77a47..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cardinality;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-
-/**
- * @author Jack
- *
- */
-public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritable, BytesWritable> {
-
- private Map<Integer, HyperLogLogPlusCounter> hllcMap = new HashMap<Integer, HyperLogLogPlusCounter>();
- public static final String DEFAULT_DELIM = ",";
-
- private int counter = 0;
-
- private TableDesc tableDesc;
- private IMRTableInputFormat tableInputFormat;
-
- @Override
- protected void setup(Context context) throws IOException {
- Configuration conf = context.getConfiguration();
- bindCurrentConfiguration(conf);
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
- String tableName = conf.get(BatchConstants.TABLE_NAME);
- tableDesc = MetadataManager.getInstance(config).getTableDesc(tableName);
- tableInputFormat = MRUtil.getTableInputFormat(tableDesc);
- }
-
- @Override
- public void map(T key, Object value, Context context) throws IOException, InterruptedException {
- ColumnDesc[] columns = tableDesc.getColumns();
- String[] values = tableInputFormat.parseMapperInput(value);
-
- for (int m = 0; m < columns.length; m++) {
- String field = columns[m].getName();
- String fieldValue = values[m];
- if (fieldValue == null)
- fieldValue = "NULL";
-
- if (counter < 5 && m < 10) {
- System.out.println("Get row " + counter + " column '" + field + "' value: " + fieldValue);
- }
-
- if (fieldValue != null)
- getHllc(m).add(Bytes.toBytes(fieldValue.toString()));
- }
-
- counter++;
- }
-
- private HyperLogLogPlusCounter getHllc(Integer key) {
- if (!hllcMap.containsKey(key)) {
- hllcMap.put(key, new HyperLogLogPlusCounter());
- }
- return hllcMap.get(key);
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- Iterator<Integer> it = hllcMap.keySet().iterator();
- ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- while (it.hasNext()) {
- int key = it.next();
- HyperLogLogPlusCounter hllc = hllcMap.get(key);
- buf.clear();
- hllc.writeRegisters(buf);
- buf.flip();
- context.write(new IntWritable(key), new BytesWritable(buf.array(), buf.limit()));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java
deleted file mode 100644
index 6953235..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cardinality;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.engine.mr.KylinReducer;
-
-/**
- * @author Jack
- *
- */
-public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWritable, IntWritable, LongWritable> {
-
- public static final int ONE = 1;
- private Map<Integer, HyperLogLogPlusCounter> hllcMap = new HashMap<Integer, HyperLogLogPlusCounter>();
-
- @Override
- protected void setup(Context context) throws IOException {
- super.bindCurrentConfiguration(context.getConfiguration());
- }
-
- @Override
- public void reduce(IntWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
- int skey = key.get();
- for (BytesWritable v : values) {
- ByteBuffer buffer = ByteBuffer.wrap(v.getBytes());
- HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter();
- hll.readRegisters(buffer);
- getHllc(skey).merge(hll);
- hll.clear();
- }
- }
-
- private HyperLogLogPlusCounter getHllc(Integer key) {
- if (!hllcMap.containsKey(key)) {
- hllcMap.put(key, new HyperLogLogPlusCounter());
- }
- return hllcMap.get(key);
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- List<Integer> keys = new ArrayList<Integer>();
- Iterator<Integer> it = hllcMap.keySet().iterator();
- while (it.hasNext()) {
- keys.add(it.next());
- }
- Collections.sort(keys);
- it = keys.iterator();
- while (it.hasNext()) {
- int key = it.next();
- HyperLogLogPlusCounter hllc = hllcMap.get(key);
- ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- buf.clear();
- hllc.writeRegisters(buf);
- buf.flip();
- context.write(new IntWritable(key), new LongWritable(hllc.getCountEstimate()));
- // context.write(new Text("ErrorRate_" + key), new
- // LongWritable((long)hllc.getErrorRate()));
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
deleted file mode 100644
index 89d20cf..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cardinality;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-
-/**
- * This hadoop job will scan all rows of the hive table and then calculate the cardinality on each column.
- * @author shaoshi
- *
- */
-public class HiveColumnCardinalityJob extends AbstractHadoopJob {
- public static final String JOB_TITLE = "Kylin Hive Column Cardinality Job";
-
- @SuppressWarnings("static-access")
- protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true).withDescription("The hive table name").create("table");
-
- public static final String OUTPUT_PATH = BatchConstants.CFG_KYLIN_HDFS_TEMP_DIR + "cardinality";
-
- public HiveColumnCardinalityJob() {
- }
-
- @Override
- public int run(String[] args) throws Exception {
-
- Options options = new Options();
-
- try {
- options.addOption(OPTION_TABLE);
- options.addOption(OPTION_OUTPUT_PATH);
-
- parseOptions(options, args);
-
- // start job
- String jobName = JOB_TITLE + getOptionsAsString();
- System.out.println("Starting: " + jobName);
- Configuration conf = getConf();
- job = Job.getInstance(conf, jobName);
-
- setJobClasspath(job);
-
- String table = getOptionValue(OPTION_TABLE);
- job.getConfiguration().set(BatchConstants.TABLE_NAME, table);
-
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- FileOutputFormat.setOutputPath(job, output);
- job.getConfiguration().set("dfs.block.size", "67108864");
-
- // Mapper
- IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(table);
- tableInputFormat.configureJob(job);
-
- job.setMapperClass(ColumnCardinalityMapper.class);
- job.setMapOutputKeyClass(IntWritable.class);
- job.setMapOutputValueClass(BytesWritable.class);
-
- // Reducer - only one
- job.setReducerClass(ColumnCardinalityReducer.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(LongWritable.class);
- job.setNumReduceTasks(1);
-
- this.deletePath(job.getConfiguration(), output);
-
- System.out.println("Going to submit HiveColumnCardinalityJob for table '" + table + "'");
- int result = waitForCompletion(job);
-
- return result;
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityUpdateJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityUpdateJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityUpdateJob.java
deleted file mode 100644
index 6f8959b..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityUpdateJob.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cardinality;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.MetadataManager;
-
-/**
- * This job will update save the cardinality result into Kylin table metadata store.
- * @author shaoshi
- *
- */
-public class HiveColumnCardinalityUpdateJob extends AbstractHadoopJob {
- public static final String JOB_TITLE = "Kylin Hive Column Cardinality Update Job";
-
- @SuppressWarnings("static-access")
- protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true).withDescription("The hive table name").create("table");
-
- private String table;
-
- public HiveColumnCardinalityUpdateJob() {
-
- }
-
- @Override
- public int run(String[] args) throws Exception {
-
- Options options = new Options();
-
- try {
- options.addOption(OPTION_TABLE);
- options.addOption(OPTION_OUTPUT_PATH);
-
- parseOptions(options, args);
-
- this.table = getOptionValue(OPTION_TABLE).toUpperCase();
- // start job
- String jobName = JOB_TITLE + getOptionsAsString();
- System.out.println("Starting: " + jobName);
- Configuration conf = getConf();
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-
- updateKylinTableExd(table.toUpperCase(), output.toString(), conf);
- return 0;
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
-
- }
-
- public void updateKylinTableExd(String tableName, String outPath, Configuration config) throws IOException {
- List<String> columns = null;
- try {
- columns = readLines(new Path(outPath), config);
- } catch (Exception e) {
- e.printStackTrace();
- System.out.println("Failed to resolve cardinality for " + tableName + " from " + outPath);
- return;
- }
-
- StringBuffer cardi = new StringBuffer();
- Iterator<String> it = columns.iterator();
- while (it.hasNext()) {
- String string = (String) it.next();
- String[] ss = StringUtils.split(string, "\t");
-
- if (ss.length != 2) {
- System.out.println("The hadoop cardinality value is not valid " + string);
- continue;
- }
- cardi.append(ss[1]);
- cardi.append(",");
- }
- String scardi = cardi.toString();
- scardi = scardi.substring(0, scardi.length() - 1);
- MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
- Map<String, String> tableExd = metaMgr.getTableDescExd(tableName);
- tableExd.put(MetadataConstants.TABLE_EXD_CARDINALITY, scardi);
- metaMgr.saveTableExd(tableName.toUpperCase(), tableExd);
- }
-
- private static List<String> readLines(Path location, Configuration conf) throws Exception {
- FileSystem fileSystem = FileSystem.get(location.toUri(), conf);
- CompressionCodecFactory factory = new CompressionCodecFactory(conf);
- FileStatus[] items = fileSystem.listStatus(location);
- if (items == null)
- return new ArrayList<String>();
- List<String> results = new ArrayList<String>();
- for (FileStatus item : items) {
-
- // ignoring files like _SUCCESS
- if (item.getPath().getName().startsWith("_")) {
- continue;
- }
-
- CompressionCodec codec = factory.getCodec(item.getPath());
- InputStream stream = null;
-
- // check if we have a compression codec we need to use
- if (codec != null) {
- stream = codec.createInputStream(fileSystem.open(item.getPath()));
- } else {
- stream = fileSystem.open(item.getPath());
- }
-
- StringWriter writer = new StringWriter();
- IOUtils.copy(stream, writer, "UTF-8");
- String raw = writer.toString();
- for (String str : raw.split("\n")) {
- results.add(str);
- }
- }
- return results;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
deleted file mode 100644
index 0cc6f71..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
+++ /dev/null
@@ -1,348 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Array;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesSplitter;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.lookup.LookupBytesTable;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.ReadableTable;
-import org.apache.kylin.source.SourceFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author George Song (ysong1),honma
- */
-public class NewBaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, Text> {
-
- private static final Logger logger = LoggerFactory.getLogger(NewBaseCuboidMapper.class);
-
- private String cubeName;
- private String segmentName;
- private Cuboid baseCuboid;
- private CubeInstance cube;
- private CubeSegment cubeSegment;
-
- private CubeDesc cubeDesc;
- private MetadataManager metadataManager;
- private TableDesc factTableDesc;
-
- private boolean byteRowDelimiterInferred = false;
- private byte byteRowDelimiter;
-
- private int counter;
- private Text outputKey = new Text();
- private Text outputValue = new Text();
- private Object[] measures;
- private byte[][] keyBytesBuf;
- private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
- private BytesSplitter bytesSplitter;
- private AbstractRowKeyEncoder rowKeyEncoder;
- private MeasureCodec measureCodec;
-
- // deal with table join
- private HashMap<String, LookupBytesTable> lookupTables;// name -> table
- private LinkedList<TableJoin> tableJoins;
- private LinkedList<Pair<Integer, Integer>> factTblColAsRowKey;// similar as
- // TableJoin.dimTblColAsRowKey
- private int[][] measureColumnIndice;
- private byte[] nullValue;
-
- private class TableJoin {
- public LinkedList<Integer> fkIndice;// zero-based join columns on fact
- // table
- public String lookupTableName;
- public String joinType;
-
- // Pair.first -> zero-based column index in lookup table
- // Pair.second -> zero based row key index
- public LinkedList<Pair<Integer, Integer>> dimTblColAsRowKey;
-
- private TableJoin(String joinType, LinkedList<Integer> fkIndice, String lookupTableName, LinkedList<Pair<Integer, Integer>> dimTblColAsRowKey) {
- this.joinType = joinType;
- this.fkIndice = fkIndice;
- this.lookupTableName = lookupTableName;
- this.dimTblColAsRowKey = dimTblColAsRowKey;
- }
- }
-
- @Override
- protected void setup(Context context) throws IOException {
- super.bindCurrentConfiguration(context.getConfiguration());
-
- cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
- segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
-
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
- metadataManager = MetadataManager.getInstance(config);
- cube = CubeManager.getInstance(config).getCube(cubeName);
- cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
- cubeDesc = cube.getDescriptor();
- factTableDesc = cubeDesc.getFactTableDesc();
-
- long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-
- // intermediateTableDesc = new
- // JoinedFlatTableDesc(cube.getDescriptor());
-
- rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
-
- measureCodec = new MeasureCodec(cubeDesc.getMeasures());
- measures = new Object[cubeDesc.getMeasures().size()];
-
- int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
- keyBytesBuf = new byte[colCount][];
-
- bytesSplitter = new BytesSplitter(factTableDesc.getColumns().length, 4096);
-
- nullValue = new byte[] { (byte) '\\', (byte) 'N' };// As in Hive, null
- // value is
- // represented by \N
-
- prepareJoins();
- prepareMetrics();
- }
-
- private void prepareJoins() throws IOException {
- this.lookupTables = new HashMap<String, LookupBytesTable>();
- this.tableJoins = new LinkedList<TableJoin>();
- this.factTblColAsRowKey = new LinkedList<Pair<Integer, Integer>>();
-
- for (DimensionDesc dim : cubeDesc.getDimensions()) {
- JoinDesc join = dim.getJoin();
- if (join != null) {
- String joinType = join.getType().toUpperCase();
- String lookupTableName = dim.getTable();
-
- // load lookup tables
- if (!lookupTables.containsKey(lookupTableName)) {
- TableDesc tableDesc = metadataManager.getTableDesc(lookupTableName);
- ReadableTable htable = SourceFactory.createReadableTable(tableDesc);
- LookupBytesTable btable = new LookupBytesTable(tableDesc, join.getPrimaryKey(), htable);
- lookupTables.put(lookupTableName, btable);
- }
-
- // create join infos
- LinkedList<Integer> fkIndice = new LinkedList<Integer>();
- for (TblColRef colRef : join.getForeignKeyColumns()) {
- fkIndice.add(colRef.getColumnDesc().getZeroBasedIndex());
- }
- this.tableJoins.add(new TableJoin(joinType, fkIndice, lookupTableName, this.findColumnRowKeyRelationships(dim)));
-
- } else {
-
- this.factTblColAsRowKey.addAll(this.findColumnRowKeyRelationships(dim));
- }
- }
-
- // put composite keys joins ahead of single key joins
- Collections.sort(tableJoins, new Comparator<TableJoin>() {
- @Override
- public int compare(TableJoin o1, TableJoin o2) {
- return Integer.valueOf(o2.fkIndice.size()).compareTo(Integer.valueOf(o1.fkIndice.size()));
- }
- });
- }
-
- private LinkedList<Pair<Integer, Integer>> findColumnRowKeyRelationships(DimensionDesc dim) {
- LinkedList<Pair<Integer, Integer>> dimTblColAsRowKey = new LinkedList<Pair<Integer, Integer>>();
- for (TblColRef colRef : dim.getColumnRefs()) {
- int dimTableIndex = colRef.getColumnDesc().getZeroBasedIndex();
- int rowKeyIndex = cubeDesc.getRowkey().getRowKeyIndexByColumnName(colRef.getName());
- dimTblColAsRowKey.add(new Pair<Integer, Integer>(dimTableIndex, rowKeyIndex));
- }
- return dimTblColAsRowKey;
- }
-
- private void prepareMetrics() {
- List<MeasureDesc> measures = cubeDesc.getMeasures();
- int measureSize = measures.size();
- measureColumnIndice = new int[measureSize][];
- for (int i = 0; i < measureSize; i++) {
- FunctionDesc func = measures.get(i).getFunction();
- List<TblColRef> colRefs = func.getParameter().getColRefs();
- if (colRefs == null) {
- measureColumnIndice[i] = null;
- } else {
- measureColumnIndice[i] = new int[colRefs.size()];
- for (int j = 0; j < colRefs.size(); j++) {
- TblColRef c = colRefs.get(j);
- int factTblIdx = factTableDesc.findColumnByName(c.getName()).getZeroBasedIndex();
- measureColumnIndice[i][j] = factTblIdx;
- }
- }
- }
- }
-
- private byte[] trimSplitBuffer(SplittedBytes splittedBytes) {
- return Arrays.copyOf(splittedBytes.value, splittedBytes.length);
- }
-
- private byte[] buildKey(SplittedBytes[] splitBuffers) {
-
- int filledDimension = 0;// debug
-
- // join lookup tables, and fill into RowKey the columns in lookup table
- for (TableJoin tableJoin : this.tableJoins) {
- String dimTblName = tableJoin.lookupTableName;
- LookupBytesTable dimTbl = this.lookupTables.get(dimTblName);
- ByteArray[] rawKey = new ByteArray[tableJoin.fkIndice.size()];
- for (int i = 0; i < tableJoin.fkIndice.size(); ++i) {
- rawKey[i] = new ByteArray(trimSplitBuffer(splitBuffers[tableJoin.fkIndice.get(i)]));
- }
- Array<ByteArray> key = new Array<ByteArray>(rawKey);
- ByteArray[] dimRow = dimTbl.getRow(key);
- if (dimRow == null) {
- if (tableJoin.joinType.equalsIgnoreCase("INNER")) {
- return null;
- } else if (tableJoin.joinType.equalsIgnoreCase("LEFT")) {
- for (Pair<Integer, Integer> relation : tableJoin.dimTblColAsRowKey) {
- keyBytesBuf[relation.getSecond()] = nullValue;
- filledDimension++;
- }
- }
- } else {
- for (Pair<Integer, Integer> relation : tableJoin.dimTblColAsRowKey) {
- keyBytesBuf[relation.getSecond()] = dimRow[relation.getFirst()].array();
- filledDimension++;
- }
- }
- }
-
- // fill into RowKey the columns in fact table
- for (Pair<Integer, Integer> relation : this.factTblColAsRowKey) {
- keyBytesBuf[relation.getSecond()] = trimSplitBuffer(splitBuffers[relation.getFirst()]);
- filledDimension++;
- }
-
- assert filledDimension == keyBytesBuf.length;
-
- // all the row key slots(keyBytesBuf) should be complete now
- return rowKeyEncoder.encode(keyBytesBuf);
- }
-
- private void buildValue(SplittedBytes[] splitBuffers) {
-
- for (int i = 0; i < measures.length; i++) {
- byte[] valueBytes = getValueBytes(splitBuffers, i);
- measures[i] = measureCodec.getSerializer(i).valueOf(valueBytes);
- }
-
- valueBuf.clear();
- measureCodec.encode(measures, valueBuf);
- }
-
- private byte[] getValueBytes(SplittedBytes[] splitBuffers, int measureIdx) {
- MeasureDesc desc = cubeDesc.getMeasures().get(measureIdx);
- ParameterDesc paramDesc = desc.getFunction().getParameter();
- int[] flatTableIdx = this.measureColumnIndice[measureIdx];
-
- byte[] result = null;
-
- // constant
- if (flatTableIdx == null) {
- result = Bytes.toBytes(paramDesc.getValue());
- }
- // column values
- else {
- for (int i = 0; i < flatTableIdx.length; i++) {
- SplittedBytes split = splitBuffers[flatTableIdx[i]];
- result = Arrays.copyOf(split.value, split.length);
- }
- }
-
- if (desc.getFunction().isCount()) {
- result = Bytes.toBytes("1");
- }
-
- return result;
- }
-
- @Override
- public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
- // combining the hive table flattening logic into base cuboid building.
- // the input of this mapper is the fact table rows
-
- counter++;
- if (counter % BatchConstants.COUNTER_MAX == 0) {
- logger.info("Handled " + counter + " records!");
- }
-
- if (!byteRowDelimiterInferred)
- byteRowDelimiter = bytesSplitter.inferByteRowDelimiter(value.getBytes(), value.getLength(), factTableDesc.getColumns().length);
-
- bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter);
-
- try {
- byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers());
- if (rowKey == null)
- return;// skip this fact table row
-
- outputKey.set(rowKey, 0, rowKey.length);
-
- buildValue(bytesSplitter.getSplitBuffers());
- outputValue.set(valueBuf.array(), 0, valueBuf.position());
-
- context.write(outputKey, outputValue);
-
- } catch (Throwable t) {
- logger.error("", t);
- context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Error records").increment(1L);
- return;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/job/src/main/java/org/apache/kylin/job/hadoop/cube/OrphanHBaseCleanJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/OrphanHBaseCleanJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/OrphanHBaseCleanJob.java
deleted file mode 100644
index 5c94542..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/OrphanHBaseCleanJob.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class OrphanHBaseCleanJob extends AbstractHadoopJob {
-
- @SuppressWarnings("static-access")
- private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete");
- @SuppressWarnings("static-access")
- private static final Option OPTION_WHITELIST = OptionBuilder.withArgName("whitelist").hasArg().isRequired(true).withDescription("metadata store whitelist, separated with comma").create("whitelist");
-
- protected static final Logger logger = LoggerFactory.getLogger(OrphanHBaseCleanJob.class);
-
- boolean delete = false;
- Set<String> metastoreWhitelistSet = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- logger.info("jobs args: " + Arrays.toString(args));
- try {
- options.addOption(OPTION_DELETE);
- options.addOption(OPTION_WHITELIST);
- parseOptions(options, args);
-
- logger.info("options: '" + getOptionsAsString() + "'");
- logger.info("delete option value: '" + getOptionValue(OPTION_DELETE) + "'");
- delete = Boolean.parseBoolean(getOptionValue(OPTION_DELETE));
- String[] metastoreWhitelist = getOptionValue(OPTION_WHITELIST).split(",");
-
- for (String ms : metastoreWhitelist) {
- logger.info("metadata store in white list: " + ms);
- metastoreWhitelistSet.add(ms);
- }
-
- Configuration conf = HBaseConfiguration.create(getConf());
-
- cleanUnusedHBaseTables(conf);
-
- return 0;
- } catch (Exception e) {
- e.printStackTrace(System.err);
- throw e;
- }
- }
-
- private void cleanUnusedHBaseTables(Configuration conf) throws IOException {
-
- // get all kylin hbase tables
- HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
- String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
- HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
- List<String> allTablesNeedToBeDropped = new ArrayList<String>();
- for (HTableDescriptor desc : tableDescriptors) {
- String host = desc.getValue(IRealizationConstants.HTableTag);
- if (!metastoreWhitelistSet.contains(host)) {
- logger.info("HTable {} is recognized as orphan because its tag is {}", desc.getTableName(), host);
- //collect orphans
- allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
- } else {
- logger.info("HTable {} belongs to {}", desc.getTableName(), host);
- }
- }
-
- if (delete == true) {
- // drop tables
- for (String htableName : allTablesNeedToBeDropped) {
- logger.info("Deleting HBase table " + htableName);
- if (hbaseAdmin.tableExists(htableName)) {
- if (hbaseAdmin.isTableEnabled(htableName)) {
- hbaseAdmin.disableTable(htableName);
- }
-
- hbaseAdmin.deleteTable(htableName);
- logger.info("Deleted HBase table " + htableName);
- } else {
- logger.info("HBase table" + htableName + " does not exist");
- }
- }
- } else {
- System.out.println("--------------- Tables To Be Dropped ---------------");
- for (String htableName : allTablesNeedToBeDropped) {
- System.out.println(htableName);
- }
- System.out.println("----------------------------------------------------");
- }
-
- hbaseAdmin.close();
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new OrphanHBaseCleanJob(), args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
deleted file mode 100644
index fdde11f..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.cmd.ICommandOutput;
-import org.apache.kylin.job.cmd.ShellCmd;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.exception.JobException;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-public class StorageCleanupJob extends AbstractHadoopJob {
-
- @SuppressWarnings("static-access")
- private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete");
-
- protected static final Logger logger = LoggerFactory.getLogger(StorageCleanupJob.class);
-
- public static final long TIME_THREADSHOLD = 2 * 24 * 3600 * 1000l; // 2 days
-
- boolean delete = false;
-
- protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
- */
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- logger.info("jobs args: " + Arrays.toString(args));
- try {
- options.addOption(OPTION_DELETE);
- parseOptions(options, args);
-
- logger.info("options: '" + getOptionsAsString() + "'");
- logger.info("delete option value: '" + getOptionValue(OPTION_DELETE) + "'");
- delete = Boolean.parseBoolean(getOptionValue(OPTION_DELETE));
-
- Configuration conf = HBaseConfiguration.create(getConf());
-
- cleanUnusedIntermediateHiveTable(conf);
- cleanUnusedHdfsFiles(conf);
- cleanUnusedHBaseTables(conf);
-
- return 0;
- } catch (Exception e) {
- printUsage(options);
- throw e;
- }
- }
-
- private void cleanUnusedHBaseTables(Configuration conf) throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
- CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- IIManager iiManager = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
-
- // get all kylin hbase tables
- HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
- String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
- HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
- List<String> allTablesNeedToBeDropped = new ArrayList<String>();
- for (HTableDescriptor desc : tableDescriptors) {
- String host = desc.getValue(IRealizationConstants.HTableTag);
- String creationTime = desc.getValue(IRealizationConstants.HTableCreationTime);
- if (KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix().equalsIgnoreCase(host)) {
- //only take care htables that belongs to self, and created more than 2 days
- if (StringUtils.isEmpty(creationTime) || (System.currentTimeMillis() - Long.valueOf(creationTime) > TIME_THREADSHOLD)) {
- allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
- }
- }
- }
-
- // remove every segment htable from drop list
- for (CubeInstance cube : cubeMgr.listAllCubes()) {
- for (CubeSegment seg : cube.getSegments()) {
- String tablename = seg.getStorageLocationIdentifier();
- if (allTablesNeedToBeDropped.contains(tablename)) {
- allTablesNeedToBeDropped.remove(tablename);
- logger.info("Exclude table " + tablename + " from drop list, as the table belongs to cube " + cube.getName() + " with status " + cube.getStatus());
- }
- }
- }
-
- // remove every ii segment htable from drop list
- for (IIInstance ii : iiManager.listAllIIs()) {
- for (IISegment seg : ii.getSegments()) {
- String tablename = seg.getStorageLocationIdentifier();
-
- if (allTablesNeedToBeDropped.contains(tablename)) {
- allTablesNeedToBeDropped.remove(tablename);
- logger.info("Exclude table " + tablename + " from drop list, as the table belongs to ii " + ii.getName() + " with status " + ii.getStatus());
- }
- }
- }
-
- if (delete == true) {
- // drop tables
- for (String htableName : allTablesNeedToBeDropped) {
- logger.info("Deleting HBase table " + htableName);
- if (hbaseAdmin.tableExists(htableName)) {
- if (hbaseAdmin.isTableEnabled(htableName)) {
- hbaseAdmin.disableTable(htableName);
- }
-
- hbaseAdmin.deleteTable(htableName);
- logger.info("Deleted HBase table " + htableName);
- } else {
- logger.info("HBase table" + htableName + " does not exist");
- }
- }
- } else {
- System.out.println("--------------- Tables To Be Dropped ---------------");
- for (String htableName : allTablesNeedToBeDropped) {
- System.out.println(htableName);
- }
- System.out.println("----------------------------------------------------");
- }
-
- hbaseAdmin.close();
- }
-
- private void cleanUnusedHdfsFiles(Configuration conf) throws IOException {
- JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
- CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-
- FileSystem fs = FileSystem.get(conf);
- List<String> allHdfsPathsNeedToBeDeleted = new ArrayList<String>();
- // GlobFilter filter = new
- // GlobFilter(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
- // + "/kylin-.*");
- FileStatus[] fStatus = fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()));
- for (FileStatus status : fStatus) {
- String path = status.getPath().getName();
- // System.out.println(path);
- if (path.startsWith(JobInstance.JOB_WORKING_DIR_PREFIX)) {
- String kylinJobPath = engineConfig.getHdfsWorkingDirectory() + path;
- allHdfsPathsNeedToBeDeleted.add(kylinJobPath);
- }
- }
-
- List<String> allJobs = executableManager.getAllJobIds();
- for (String jobId : allJobs) {
- // only remove FINISHED and DISCARDED job intermediate files
- final ExecutableState state = executableManager.getOutput(jobId).getState();
- if (!state.isFinalState()) {
- String path = JobInstance.getJobWorkingDir(jobId, engineConfig.getHdfsWorkingDirectory());
- allHdfsPathsNeedToBeDeleted.remove(path);
- logger.info("Remove " + path + " from deletion list, as the path belongs to job " + jobId + " with status " + state);
- }
- }
-
- // remove every segment working dir from deletion list
- for (CubeInstance cube : cubeMgr.listAllCubes()) {
- for (CubeSegment seg : cube.getSegments()) {
- String jobUuid = seg.getLastBuildJobID();
- if (jobUuid != null && jobUuid.equals("") == false) {
- String path = JobInstance.getJobWorkingDir(jobUuid, engineConfig.getHdfsWorkingDirectory());
- allHdfsPathsNeedToBeDeleted.remove(path);
- logger.info("Remove " + path + " from deletion list, as the path belongs to segment " + seg + " of cube " + cube.getName());
- }
- }
- }
-
- if (delete == true) {
- // remove files
- for (String hdfsPath : allHdfsPathsNeedToBeDeleted) {
- logger.info("Deleting hdfs path " + hdfsPath);
- Path p = new Path(hdfsPath);
- if (fs.exists(p) == true) {
- fs.delete(p, true);
- logger.info("Deleted hdfs path " + hdfsPath);
- } else {
- logger.info("Hdfs path " + hdfsPath + "does not exist");
- }
- }
- } else {
- System.out.println("--------------- HDFS Path To Be Deleted ---------------");
- for (String hdfsPath : allHdfsPathsNeedToBeDeleted) {
- System.out.println(hdfsPath);
- }
- System.out.println("-------------------------------------------------------");
- }
-
- }
-
- private void cleanUnusedIntermediateHiveTable(Configuration conf) throws IOException {
- int uuidLength = 36;
- final String useDatabaseHql = "USE " + KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable() + ";";
- StringBuilder buf = new StringBuilder();
- buf.append("hive -e \"");
- buf.append(useDatabaseHql);
- buf.append("show tables " + "\'kylin_intermediate_*\'" + "; ");
- buf.append("\"");
-
- ShellCmd cmd = new ShellCmd(buf.toString(), null, 0, null, null, false);
- ICommandOutput output = null;
-
- try {
- output = cmd.execute();
- } catch (JobException e) {
- e.printStackTrace();
- }
-
- if (output == null)
- return;
- String outputStr = output.getOutput();
- BufferedReader reader = new BufferedReader(new StringReader(outputStr));
- String line = null;
- List<String> allJobs = executableManager.getAllJobIds();
- List<String> allHiveTablesNeedToBeDeleted = new ArrayList<String>();
- List<String> workingJobList = new ArrayList<String>();
-
- for (String jobId : allJobs) {
- // only remove FINISHED and DISCARDED job intermediate table
- final ExecutableState state = executableManager.getOutput(jobId).getState();
-
- if (!state.isFinalState()) {
- workingJobList.add(jobId);
- logger.info("Remove intermediate hive table with job id " + jobId + " with job status " + state);
- }
- }
-
- while ((line = reader.readLine()) != null) {
- if (line.startsWith("kylin_intermediate_")) {
- boolean isNeedDel = false;
- String uuid = line.substring(line.length() - uuidLength, line.length());
- uuid = uuid.replace("_", "-");
- //Check whether it's a hive table in use
- if (allJobs.contains(uuid) && !workingJobList.contains(uuid)) {
- isNeedDel = true;
- }
-
- if (isNeedDel) {
- allHiveTablesNeedToBeDeleted.add(line);
- }
- }
- }
-
- if (delete == true) {
- buf.delete(0, buf.length());
- buf.append("hive -e \"");
- buf.append(useDatabaseHql);
- for (String delHive : allHiveTablesNeedToBeDeleted) {
- buf.append("drop table if exists " + delHive + "; ");
- logger.info("Remove " + delHive + " from hive tables.");
- }
- buf.append("\"");
- cmd = new ShellCmd(buf.toString(), null, 0, null, null, false);
-
- try {
- cmd.execute();
- } catch (JobException e) {
- e.printStackTrace();
- }
- } else {
- System.out.println("------ Intermediate Hive Tables To Be Dropped ------");
- for (String hiveTable : allHiveTablesNeedToBeDeleted) {
- System.out.println(hiveTable);
- }
- System.out.println("----------------------------------------------------");
- }
-
- if (reader != null)
- reader.close();
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/job/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java b/job/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
deleted file mode 100644
index 7b9831a..0000000
--- a/job/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package org.apache.kylin.job.monitor;
-
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class MonitorCLI {
-
- private static final Logger logger = LoggerFactory.getLogger(MonitorCLI.class);
-
- public static void main(String[] args) {
- Preconditions.checkArgument(args[0].equals("monitor"));
-
- int i = 1;
- List<String> receivers = null;
- String host = null;
- String tableName = null;
- String authorization = null;
- String cubeName = null;
- String projectName = "default";
- while (i < args.length) {
- String argName = args[i];
- switch (argName) {
- case "-receivers":
- receivers = Lists.newArrayList(StringUtils.split(args[++i], ";"));
- break;
- case "-host":
- host = args[++i];
- break;
- case "-tableName":
- tableName = args[++i];
- break;
- case "-authorization":
- authorization = args[++i];
- break;
- case "-cubeName":
- cubeName = args[++i];
- break;
- case "-projectName":
- projectName = args[++i];
- break;
- default:
- throw new RuntimeException("invalid argName:" + argName);
- }
- i++;
- }
- Preconditions.checkArgument(receivers != null && receivers.size() > 0);
- final StreamingMonitor streamingMonitor = new StreamingMonitor();
- if (tableName != null) {
- logger.info(String.format("check query tableName:%s host:%s receivers:%s", tableName, host, StringUtils.join(receivers, ";")));
- Preconditions.checkNotNull(host);
- Preconditions.checkNotNull(authorization);
- Preconditions.checkNotNull(tableName);
- streamingMonitor.checkCountAll(receivers, host, authorization, projectName, tableName);
- }
- if (cubeName != null) {
- logger.info(String.format("check cube cubeName:%s receivers:%s", cubeName, StringUtils.join(receivers, ";")));
- streamingMonitor.checkCube(receivers, cubeName,host);
- }
- System.exit(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/job/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java b/job/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
deleted file mode 100644
index e23f065..0000000
--- a/job/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
+++ /dev/null
@@ -1,154 +0,0 @@
-package org.apache.kylin.job.monitor;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
-import org.apache.commons.httpclient.methods.PostMethod;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.common.util.MailService;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class StreamingMonitor {
-
- private static final Logger logger = LoggerFactory.getLogger(StreamingMonitor.class);
-
- public void checkCountAll(List<String> receivers, String host, String authorization, String projectName, String tableName) {
- String title = "checkCountAll job(host:" + host + " tableName:" + tableName + ") ";
- StringBuilder stringBuilder = new StringBuilder();
- String url = host + "/kylin/api/query";
- PostMethod request = new PostMethod(url);
- try {
-
- request.addRequestHeader("Authorization", "Basic " + authorization);
- request.addRequestHeader("Content-Type", "application/json");
- String query = String.format("{\"sql\":\"select count(*) from %s\",\"offset\":0,\"limit\":50000,\"acceptPartial\":true,\"project\":\"%s\"}", tableName, projectName);
- request.setRequestEntity(new ByteArrayRequestEntity(query.getBytes()));
-
- int statusCode = new HttpClient().executeMethod(request);
- String msg = Bytes.toString(request.getResponseBody());
- stringBuilder.append("host:").append(host).append("\n");
- stringBuilder.append("query:").append(query).append("\n");
- stringBuilder.append("statusCode:").append(statusCode).append("\n");
- if (statusCode == 200) {
- title += "succeed";
- final HashMap hashMap = JsonUtil.readValue(msg, HashMap.class);
- stringBuilder.append("results:").append(hashMap.get("results").toString()).append("\n");
- stringBuilder.append("duration:").append(hashMap.get("duration").toString()).append("\n");
- } else {
- title += "failed";
- stringBuilder.append("response:").append(msg).append("\n");
- }
- } catch (Exception e) {
- final StringWriter out = new StringWriter();
- e.printStackTrace(new PrintWriter(out));
- title += "failed";
- stringBuilder.append(out.toString());
- } finally {
- request.releaseConnection();
- }
- logger.info("title:" + title);
- logger.info("content:" + stringBuilder.toString());
- sendMail(receivers, title, stringBuilder.toString());
- }
-
- public static final List<Pair<Long, Long>> findGaps(String cubeName) {
- List<CubeSegment> segments = getSortedReadySegments(cubeName);
- List<Pair<Long, Long>> gaps = Lists.newArrayList();
- for (int i = 0; i < segments.size() - 1; ++i) {
- CubeSegment first = segments.get(i);
- CubeSegment second = segments.get(i + 1);
- if (first.getDateRangeEnd() == second.getDateRangeStart()) {
- continue;
- } else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
- gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart()));
- }
- }
- return gaps;
- }
-
- private static List<CubeSegment> getSortedReadySegments(String cubeName) {
- final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
- Preconditions.checkNotNull(cube);
- final List<CubeSegment> segments = cube.getSegment(SegmentStatusEnum.READY);
- logger.info("totally " + segments.size() + " cubeSegments");
- Collections.sort(segments);
- return segments;
- }
-
- public static final List<Pair<String, String>> findOverlaps(String cubeName) {
- List<CubeSegment> segments = getSortedReadySegments(cubeName);
- List<Pair<String, String>> overlaps = Lists.newArrayList();
- for (int i = 0; i < segments.size() - 1; ++i) {
- CubeSegment first = segments.get(i);
- CubeSegment second = segments.get(i + 1);
- if (first.getDateRangeEnd() == second.getDateRangeStart()) {
- continue;
- } else {
- overlaps.add(Pair.newPair(first.getName(), second.getName()));
- }
- }
- return overlaps;
- }
-
- public void checkCube(List<String> receivers, String cubeName, String host) {
- final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
- if (cube == null) {
- logger.info("cube:" + cubeName + " does not exist");
- return;
- }
- List<Pair<Long, Long>> gaps = findGaps(cubeName);
- List<Pair<String, String>> overlaps = Lists.newArrayList();
- StringBuilder content = new StringBuilder();
- if (!gaps.isEmpty()) {
- content.append("all gaps:").append("\n").append(StringUtils.join(Lists.transform(gaps, new Function<Pair<Long, Long>, String>() {
- @Nullable
- @Override
- public String apply(Pair<Long, Long> input) {
- return parseInterval(input);
- }
- }), "\n")).append("\n");
- }
- if (!overlaps.isEmpty()) {
- content.append("all overlaps:").append("\n").append(StringUtils.join(overlaps, "\n")).append("\n");
- }
- if (content.length() > 0) {
- logger.info(content.toString());
- sendMail(receivers, String.format("%s has gaps or overlaps on host %s", cubeName, host), content.toString());
- } else {
- logger.info("no gaps or overlaps");
- }
- }
-
- private String parseInterval(Pair<Long, Long> interval) {
- return String.format("{%d(%s), %d(%s)}", interval.getFirst(), new Date(interval.getFirst()).toString(), interval.getSecond(), new Date(interval.getSecond()).toString());
- }
-
- private void sendMail(List<String> receivers, String title, String content) {
- final MailService mailService = new MailService(KylinConfig.getInstanceFromEnv());
- mailService.sendMail(receivers, title, content, false);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java b/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
deleted file mode 100644
index 029d4d2..0000000
--- a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-/**
- */
-public class BootstrapConfig {
-
- private String streaming;
- private int partitionId = -1;
-
- //one off default value set to true
- private boolean oneOff = true;
- private long start = 0L;
- private long end = 0L;
- private long margin = 0L;
-
-
- private boolean fillGap;
-
- public long getMargin() {
- return margin;
- }
-
- public void setMargin(long margin) {
- this.margin = margin;
- }
-
- public boolean isOneOff() {
- return oneOff;
- }
-
- public void setOneOff(boolean oneOff) {
- this.oneOff = oneOff;
- }
-
- public long getStart() {
- return start;
- }
-
- public void setStart(long start) {
- this.start = start;
- }
-
- public long getEnd() {
- return end;
- }
-
- public void setEnd(long end) {
- this.end = end;
- }
-
- public String getStreaming() {
- return streaming;
- }
-
- public void setStreaming(String streaming) {
- this.streaming = streaming;
- }
-
- public int getPartitionId() {
- return partitionId;
- }
-
- public void setPartitionId(int partitionId) {
- this.partitionId = partitionId;
- }
-
- public boolean isFillGap() {
- return fillGap;
- }
-
- public void setFillGap(boolean fillGap) {
- this.fillGap = fillGap;
- }
-}