You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2017/01/19 06:55:11 UTC
[01/10] kylin git commit: KYLIN-2389 Improve resource utilization for
DistributedScheduler [Forced Update!]
Repository: kylin
Updated Branches:
refs/heads/KYLIN-2394 f900fdcf3 -> dfb5fac9f (forced update)
KYLIN-2389 Improve resource utilization for DistributedScheduler
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/837bd820
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/837bd820
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/837bd820
Branch: refs/heads/KYLIN-2394
Commit: 837bd8200b250f38fcfb2d221764d5aca0c66403
Parents: e894465
Author: kangkaisen <ka...@163.com>
Authored: Fri Jan 13 19:58:41 2017 +0800
Committer: kangkaisen <ka...@163.com>
Committed: Wed Jan 18 16:14:24 2017 +0800
----------------------------------------------------------------------
.../impl/threadpool/DistributedScheduler.java | 8 +--
.../kylin/job/lock/DistributedJobLock.java | 2 +
.../apache/kylin/rest/service/JobService.java | 45 --------------
.../hbase/util/ZookeeperDistributedJobLock.java | 63 ++++++++++++++++----
4 files changed, 58 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/837bd820/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 3436529..84e62d5 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -195,13 +195,13 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
}
}
- //release job lock only when the all tasks of the job finish and the job server keep the cube lock.
+ //release job lock when job state is ready or running and the job server keep the cube lock.
private void releaseJobLock(AbstractExecutable executable) {
if (executable instanceof DefaultChainedExecutable) {
String segmentId = executable.getParam(SEGMENT_ID);
ExecutableState state = executable.getStatus();
- if (state == ExecutableState.SUCCEED || state == ExecutableState.ERROR || state == ExecutableState.DISCARDED) {
+ if (state != ExecutableState.READY && state != ExecutableState.RUNNING) {
if (segmentWithLocks.contains(segmentId)) {
logger.info(executable.toString() + " will release the lock for the segment: " + segmentId);
jobLock.unlockWithName(segmentId);
@@ -232,7 +232,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
if (executable instanceof DefaultChainedExecutable && executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId) && !nodeData.equalsIgnoreCase(serverName)) {
try {
logger.warn(nodeData + " has released the lock for: " + segmentId + " but the job still running. so " + serverName + " resume the job");
- if (jobLock.lockWithName(segmentId, serverName)) {
+ if (!jobLock.isHasLocked(segmentId)) {
executableManager.resumeRunningJobForce(executable.getId());
fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
break;
@@ -302,7 +302,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
AbstractExecutable executable = executableManager.getJob(id);
if (output.getState() == ExecutableState.RUNNING && executable instanceof DefaultChainedExecutable) {
try {
- if (jobLock.lockWithName(executable.getParam(SEGMENT_ID), serverName)) {
+ if (!jobLock.isHasLocked(executable.getParam(SEGMENT_ID))) {
executableManager.resumeRunningJobForce(executable.getId());
fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/837bd820/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
index 9335e56..1c173ec 100644
--- a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
@@ -24,6 +24,8 @@ public interface DistributedJobLock extends JobLock {
boolean lockWithName(String name, String serverName);
+ boolean isHasLocked(String segmentId);
+
void unlockWithName(String name);
void watchLock(ExecutorService pool, DoWatchLock doWatch);
http://git-wip-us.apache.org/repos/asf/kylin/blob/837bd820/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 4709a91..ed24a9d 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -19,8 +19,6 @@
package org.apache.kylin.rest.service;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
@@ -56,7 +54,6 @@ import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.lock.DistributedJobLock;
import org.apache.kylin.job.lock.JobLock;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
@@ -282,15 +279,12 @@ public class JobService extends BasicService implements InitializingBean {
SourcePartition sourcePartition = new SourcePartition(startDate, endDate, startOffset, endOffset, sourcePartitionOffsetStart, sourcePartitionOffsetEnd);
sourcePartition = source.parsePartitionBeforeBuild(cube, sourcePartition);
newSeg = getCubeManager().appendSegment(cube, sourcePartition);
- lockSegment(newSeg.getUuid());
job = EngineFactory.createBatchCubingJob(newSeg, submitter);
} else if (buildType == CubeBuildTypeEnum.MERGE) {
newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force);
- lockSegment(newSeg.getUuid());
job = EngineFactory.createBatchMergeJob(newSeg, submitter);
} else if (buildType == CubeBuildTypeEnum.REFRESH) {
newSeg = getCubeManager().refreshSegment(cube, startDate, endDate, startOffset, endOffset);
- lockSegment(newSeg.getUuid());
job = EngineFactory.createBatchCubingJob(newSeg, submitter);
} else {
throw new JobException("invalid build type:" + buildType);
@@ -312,7 +306,6 @@ public class JobService extends BasicService implements InitializingBean {
}
}
throw e;
-
}
JobInstance jobInstance = getSingleJobInstance(job);
@@ -454,15 +447,11 @@ public class JobService extends BasicService implements InitializingBean {
@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')")
public void resumeJob(JobInstance job) throws IOException, JobException {
- lockSegment(job.getRelatedSegment());
-
getExecutableManager().resumeJob(job.getId());
}
@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')")
public void rollbackJob(JobInstance job, String stepId) throws IOException, JobException {
- lockSegment(job.getRelatedSegment());
-
getExecutableManager().rollbackJob(job.getId(), stepId);
}
@@ -486,47 +475,15 @@ public class JobService extends BasicService implements InitializingBean {
}
getExecutableManager().discardJob(job.getId());
- //release the segment lock when discarded the job but the job hasn't scheduled
- releaseSegmentLock(job.getRelatedSegment());
-
return job;
}
-
@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')")
public JobInstance pauseJob(JobInstance job) throws IOException, JobException {
getExecutableManager().pauseJob(job.getId());
-
- //release the segment lock when discarded the job but the job hasn't scheduled
- releaseSegmentLock(job.getRelatedSegment());
-
return job;
}
- private void lockSegment(String segmentId) throws JobException {
- if (jobLock instanceof DistributedJobLock) {
- if (!((DistributedJobLock) jobLock).lockWithName(segmentId, getServerName())) {
- throw new JobException("Fail to get the segment lock, the segment may be building in another job server");
- }
- }
- }
-
- private void releaseSegmentLock(String segmentId) {
- if (jobLock instanceof DistributedJobLock) {
- ((DistributedJobLock) jobLock).unlockWithName(segmentId);
- }
- }
-
- private String getServerName() {
- String serverName = null;
- try {
- serverName = InetAddress.getLocalHost().getHostName();
- } catch (UnknownHostException e) {
- logger.error("fail to get the hostname");
- }
- return serverName;
- }
-
public List<CubingJob> listAllCubingJobs(final String cubeName, final String projectName, final Set<ExecutableState> statusList, final Map<String, Output> allOutputs) {
return listAllCubingJobs(cubeName, projectName, statusList, -1L, -1L, allOutputs);
}
@@ -584,6 +541,4 @@ public class JobService extends BasicService implements InitializingBean {
public List<CubingJob> listAllCubingJobs(final String cubeName, final String projectName) {
return listAllCubingJobs(cubeName, projectName, EnumSet.allOf(ExecutableState.class), getExecutableManager().getAllOutputs());
}
-
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/837bd820/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
index 5a44cc1..ee7cd50 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
@@ -96,6 +96,8 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
* @param serverName the hostname of job server
*
* @return <tt>true</tt> if the segment locked successfully
+ *
+ * @since 2.0
*/
@Override
@@ -110,13 +112,13 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
return false;
}
if (zkClient.checkExists().forPath(lockPath) != null) {
- if (hasLock(serverName, lockPath)) {
+ if (isKeepLock(serverName, lockPath)) {
hasLock = true;
logger.info(serverName + " has kept the lock for segment: " + segmentId);
}
} else {
zkClient.create().withMode(CreateMode.EPHEMERAL).forPath(lockPath, serverName.getBytes(Charset.forName("UTF-8")));
- if (hasLock(serverName, lockPath)) {
+ if (isKeepLock(serverName, lockPath)) {
hasLock = true;
logger.info(serverName + " lock the segment: " + segmentId + " successfully");
}
@@ -131,19 +133,54 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
return true;
}
- private boolean hasLock(String serverName, String lockPath) {
- String lockServerName = null;
+ /**
+ *
+ * Returns <tt>true</tt> if, the job server is keeping the lock for the lockPath
+ *
+ * @param serverName the hostname of job server
+ *
+ * @param lockPath the zookeeper node path of segment
+ *
+ * @return <tt>true</tt> if the job server is keeping the lock for the lockPath, otherwise
+ * <tt>false</tt>
+ *
+ * @since 2.0
+ */
+
+ private boolean isKeepLock(String serverName, String lockPath) {
try {
if (zkClient.checkExists().forPath(lockPath) != null) {
byte[] data = zkClient.getData().forPath(lockPath);
- lockServerName = new String(data, Charset.forName("UTF-8"));
+ String lockServerName = new String(data, Charset.forName("UTF-8"));
+ return lockServerName.equalsIgnoreCase(serverName);
}
} catch (Exception e) {
logger.error("fail to get the serverName for the path: " + lockPath, e);
}
- if(lockServerName == null)
- return false;
- return lockServerName.equalsIgnoreCase(serverName);
+ return false;
+ }
+
+ /**
+ *
+ * Returns <tt>true</tt> if, and only if, the segment has been locked.
+ *
+ * @param segmentId the id of segment need to release the lock.
+ *
+ * @return <tt>true</tt> if the segment has been locked, otherwise
+ * <tt>false</tt>
+ *
+ * @since 2.0
+ */
+
+ @Override
+ public boolean isHasLocked(String segmentId) {
+ String lockPath = getLockPath(segmentId);
+ try {
+ return zkClient.checkExists().forPath(lockPath) != null;
+ } catch (Exception e) {
+ logger.error("fail to get the path: " + lockPath, e);
+ }
+ return false;
}
/**
@@ -151,7 +188,9 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
*
* <p> the segment related zookeeper node will be deleted.
*
- * @param segmentId the name of segment need to release the lock
+ * @param segmentId the id of segment need to release the lock
+ *
+ * @since 2.0
*/
@Override
@@ -177,7 +216,10 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
* in order to when one job server is down, other job server can take over the running jobs.
*
* @param pool the threadPool watching the zookeeper node change
+ *
* @param doWatch do the concrete action with the zookeeper node path and zookeeper node data
+ *
+ * @since 2.0
*/
@Override
@@ -229,9 +271,8 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
@Override
public void unlock() {
-
}
-
+
public void close() {
try {
childrenCache.close();
[06/10] kylin git commit: KYLIN-2410 let global dictionary use hadoop
conf in mapper/reducer context
Posted by bi...@apache.org.
KYLIN-2410 let global dictionary use hadoop conf in mapper/reducer context
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/eddb695b
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/eddb695b
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/eddb695b
Branch: refs/heads/KYLIN-2394
Commit: eddb695ba13de282ada123f358aff723cb414316
Parents: d52bb8b
Author: Yang Li <li...@apache.org>
Authored: Thu Jan 19 08:56:22 2017 +0800
Committer: Yang Li <li...@apache.org>
Committed: Thu Jan 19 08:56:22 2017 +0800
----------------------------------------------------------------------
core-common/pom.xml | 5 +
.../apache/kylin/common/util/HadoopUtil.java | 146 ++++++++++++++++++
.../apache/kylin/dict/AppendTrieDictionary.java | 13 +-
.../kylin/dict/AppendTrieDictionaryChecker.java | 13 +-
.../org/apache/kylin/dict/CachedTreeMap.java | 7 +-
.../kylin/dict/AppendTrieDictionaryTest.java | 6 +-
.../apache/kylin/dict/CachedTreeMapTest.java | 8 +-
.../apache/kylin/engine/mr/DFSFileTable.java | 1 +
.../kylin/engine/mr/DFSFileTableReader.java | 1 +
.../org/apache/kylin/engine/mr/HadoopUtil.java | 150 -------------------
.../org/apache/kylin/engine/mr/KylinMapper.java | 1 +
.../apache/kylin/engine/mr/KylinReducer.java | 1 +
.../kylin/engine/mr/SortedColumnDFSFile.java | 1 +
.../engine/mr/common/AbstractHadoopJob.java | 2 +-
.../kylin/engine/mr/common/CubeStatsReader.java | 2 +-
.../engine/mr/common/MapReduceExecutable.java | 2 +-
.../engine/mr/steps/CreateDictionaryJob.java | 2 +-
.../mr/steps/FactDistinctColumnsReducer.java | 2 +-
.../kylin/engine/mr/steps/InMemCuboidJob.java | 2 +-
.../engine/mr/steps/MergeStatisticsStep.java | 2 +-
.../steps/RowKeyDistributionCheckerMapper.java | 2 +-
.../engine/mr/steps/SaveStatisticsStep.java | 2 +-
.../mr/steps/UpdateCubeInfoAfterBuildStep.java | 2 +-
.../steps/FactDistinctColumnsReducerTest.java | 2 +-
.../HiveToBaseCuboidMapperPerformanceTest.java | 2 +-
.../engine/mr/steps/MergeCuboidJobTest.java | 2 +-
.../kylin/engine/mr/steps/NDCuboidJobTest.java | 2 +-
.../kylin/engine/spark/SparkCubingByLayer.java | 2 +-
.../kylin/cube/ITDictionaryManagerTest.java | 2 +-
.../kylin/provision/BuildCubeWithEngine.java | 2 +-
.../apache/kylin/query/ITMassInQueryTest.java | 2 +-
.../kylin/storage/hbase/ITHdfsOpsTest.java | 2 +-
.../rest/controller/StreamingController.java | 2 +-
.../apache/kylin/rest/service/ModelService.java | 2 +-
.../apache/kylin/rest/service/TableService.java | 2 +-
.../apache/kylin/source/hive/HiveMRInput.java | 2 +-
.../source/hive/HiveSourceTableLoader.java | 2 +-
.../apache/kylin/source/kafka/KafkaMRInput.java | 2 +-
.../kylin/storage/hbase/HBaseConnection.java | 2 +-
.../kylin/storage/hbase/HBaseResourceStore.java | 2 +-
.../storage/hbase/steps/DeprecatedGCStep.java | 2 +-
.../steps/HDFSPathGarbageCollectionStep.java | 2 +-
.../hbase/steps/SequenceFileCuboidWriter.java | 2 +-
.../storage/hbase/util/CubeMigrationCLI.java | 2 +-
.../storage/hbase/util/StorageCleanupJob.java | 2 +-
.../storage/hbase/steps/CreateHTableTest.java | 2 +-
.../hbase/steps/CubeHFileMapper2Test.java | 2 +-
.../steps/RangeKeyDistributionJobTest.java | 2 +-
.../org/apache/kylin/tool/CubeMigrationCLI.java | 2 +-
.../apache/kylin/tool/MrJobInfoExtractor.java | 2 +-
.../apache/kylin/tool/StorageCleanupJob.java | 2 +-
.../org/apache/kylin/tool/util/ToolUtil.java | 2 +-
52 files changed, 216 insertions(+), 215 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/core-common/pom.xml
----------------------------------------------------------------------
diff --git a/core-common/pom.xml b/core-common/pom.xml
index 25b10a7..016d470 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -78,6 +78,11 @@
<!-- Env & Test -->
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
new file mode 100644
index 0000000..390c209
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.kylin.common.KylinConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HadoopUtil {
+ @SuppressWarnings("unused")
+ private static final Logger logger = LoggerFactory.getLogger(HadoopUtil.class);
+ private static final transient ThreadLocal<Configuration> hadoopConfig = new ThreadLocal<>();
+
+ public static void setCurrentConfiguration(Configuration conf) {
+ hadoopConfig.set(conf);
+ }
+
+ public static Configuration getCurrentConfiguration() {
+ if (hadoopConfig.get() == null) {
+ Configuration conf = healSickConfig(new Configuration());
+ // do not cache this conf, or will affect following mr jobs
+ return conf;
+ }
+ Configuration conf = hadoopConfig.get();
+ return conf;
+ }
+
+ private static Configuration healSickConfig(Configuration conf) {
+ // https://issues.apache.org/jira/browse/KYLIN-953
+ if (StringUtils.isBlank(conf.get("hadoop.tmp.dir"))) {
+ conf.set("hadoop.tmp.dir", "/tmp");
+ }
+ if (StringUtils.isBlank(conf.get("hbase.fs.tmp.dir"))) {
+ conf.set("hbase.fs.tmp.dir", "/tmp");
+ }
+ return conf;
+ }
+
+ public static FileSystem getWorkingFileSystem() throws IOException {
+ return getFileSystem(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
+ }
+
+ public static FileSystem getWorkingFileSystem(Configuration conf) throws IOException {
+ Path workingPath = new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
+ return getFileSystem(workingPath, conf);
+ }
+
+ public static FileSystem getFileSystem(String path) throws IOException {
+ return getFileSystem(new Path(makeURI(path)));
+ }
+
+ public static FileSystem getFileSystem(Path path) throws IOException {
+ Configuration conf = getCurrentConfiguration();
+ return getFileSystem(path, conf);
+ }
+
+ public static FileSystem getFileSystem(Path path, Configuration conf) {
+ if (StringUtils.isBlank(path.toUri().getScheme()))
+ throw new IllegalArgumentException("Path must be qualified: " + path);
+
+ try {
+ return path.getFileSystem(conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static URI makeURI(String filePath) {
+ try {
+ return new URI(fixWindowsPath(filePath));
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Cannot create FileSystem from URI: " + filePath, e);
+ }
+ }
+
+ public static String fixWindowsPath(String path) {
+ // fix windows path
+ if (path.startsWith("file://") && !path.startsWith("file:///") && path.contains(":\\")) {
+ path = path.replace("file://", "file:///");
+ }
+ if (path.startsWith("file:///")) {
+ path = path.replace('\\', '/');
+ }
+ return path;
+ }
+
+ /**
+ * @param table the identifier of hive table, in format <db_name>.<table_name>
+ * @return a string array with 2 elements: {"db_name", "table_name"}
+ */
+ public static String[] parseHiveTableName(String table) {
+ int cut = table.indexOf('.');
+ String database = cut >= 0 ? table.substring(0, cut).trim() : "DEFAULT";
+ String tableName = cut >= 0 ? table.substring(cut + 1).trim() : table.trim();
+
+ return new String[] { database, tableName };
+ }
+
+ public static void deletePath(Configuration conf, Path path) throws IOException {
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ }
+ }
+
+ public static byte[] toBytes(Writable writable) {
+ try {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(bout);
+ writable.write(out);
+ out.close();
+ bout.close();
+ return bout.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
index faffcc0..962686d 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
@@ -51,6 +51,7 @@ import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.metadata.MetadataManager;
import org.slf4j.Logger;
@@ -1163,20 +1164,18 @@ public class AppendTrieDictionary<T> extends CacheDictionary<T> {
@Override
public AppendTrieDictionary copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException {
//copy appendDict
- Configuration conf = new Configuration();
-
Path base = new Path(baseDir);
- FileSystem srcFs = FileSystem.get(base.toUri(), conf);
- Path srcPath = CachedTreeMap.getLatestVersion(conf, srcFs, base);
+ FileSystem srcFs = HadoopUtil.getFileSystem(base);
+ Path srcPath = CachedTreeMap.getLatestVersion(HadoopUtil.getCurrentConfiguration(), srcFs, base);
Path dstPath = new Path(srcPath.toString().replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()));
logger.info("Copy appendDict from {} to {}", srcPath, dstPath);
- FileSystem dstFs = FileSystem.get(dstPath.toUri(), conf);
+ FileSystem dstFs = HadoopUtil.getFileSystem(dstPath);
if (dstFs.exists(dstPath)) {
logger.info("Delete existing AppendDict {}", dstPath);
dstFs.delete(dstPath, true);
}
- FileUtil.copy(FileSystem.get(srcPath.toUri(), conf), srcPath, FileSystem.get(dstPath.toUri(), conf), dstPath, false, true, conf);
+ FileUtil.copy(srcFs, srcPath, dstFs, dstPath, false, true, HadoopUtil.getCurrentConfiguration());
// init new AppendTrieDictionary
AppendTrieDictionary newDict = new AppendTrieDictionary();
@@ -1194,7 +1193,7 @@ public class AppendTrieDictionary<T> extends CacheDictionary<T> {
@Override
public void readFields(DataInput in) throws IOException {
String baseDir = in.readUTF();
- Configuration conf = new Configuration();
+ Configuration conf = HadoopUtil.getCurrentConfiguration();
try (FSDataInputStream input = CachedTreeMap.openLatestIndexInput(conf, baseDir)) {
int baseId = input.readInt();
int maxId = input.readInt();
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java
index f231275..4b3817a 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java
@@ -17,16 +17,16 @@
*/
package org.apache.kylin.dict;
-import org.apache.hadoop.conf.Configuration;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.kylin.common.util.HadoopUtil;
/**
* Created by sunyerui on 16/11/15.
@@ -34,9 +34,8 @@ import java.util.List;
public class AppendTrieDictionaryChecker {
public boolean runChecker(String baseDir) throws IOException {
- Configuration conf = new Configuration();
Path basePath = new Path(baseDir);
- FileSystem fs = FileSystem.get(basePath.toUri(), conf);
+ FileSystem fs = HadoopUtil.getFileSystem(basePath);
List<Path> sliceList = new ArrayList<>();
List<Path> corruptedSliceList = new ArrayList<>();
listDictSlicePath(fs, fs.getFileStatus(basePath), sliceList);
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java b/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
index cc23261..ee69df7 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.kylin.common.util.HadoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -145,12 +146,12 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
this.keepAppend = true;
this.maxVersions = maxVersions;
this.versionTTL = versionTTL;
- this.conf = new Configuration();
+ this.conf = HadoopUtil.getCurrentConfiguration();
if (basePath.endsWith("/")) {
basePath = basePath.substring(0, basePath.length()-1);
}
this.baseDir = new Path(basePath);
- this.fs = FileSystem.get(baseDir.toUri(), conf);
+ this.fs = HadoopUtil.getFileSystem(baseDir, conf);
if (!fs.exists(baseDir)) {
fs.mkdirs(baseDir);
}
@@ -447,7 +448,7 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
public static FSDataInputStream openLatestIndexInput(Configuration conf, String baseDir) throws IOException {
Path basePath = new Path(baseDir);
- FileSystem fs = FileSystem.get(basePath.toUri(), conf);
+ FileSystem fs = HadoopUtil.getFileSystem(basePath, conf);
Path indexPath = new Path(getLatestVersion(conf, fs, basePath), ".index");
return fs.open(indexPath, 8 * 1024 * 1024);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
index 18913d0..e2af338 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
@@ -40,9 +40,8 @@ import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.junit.After;
import org.junit.Before;
@@ -71,10 +70,9 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
}
public static void cleanup() {
- Configuration conf = new Configuration();
Path basePath = new Path(BASE_DIR);
try {
- FileSystem.get(basePath.toUri(), conf).delete(basePath, true);
+ HadoopUtil.getFileSystem(basePath).delete(basePath, true);
} catch (IOException e) {}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
index 64eb13c..ccf6e24 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
@@ -31,11 +31,11 @@ import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.kylin.common.util.HadoopUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Test;
@@ -118,10 +118,9 @@ public class CachedTreeMapTest {
public static final String workingDir = "/tmp/kylin_cachedtreemap_test/working";
private static void cleanup() {
- Configuration conf = new Configuration();
Path basePath = new Path(baseDir);
try {
- FileSystem.get(basePath.toUri(), conf).delete(basePath, true);
+ HadoopUtil.getFileSystem(basePath).delete(basePath, true);
} catch (IOException e) {}
VALUE_WRITE_ERROR_TOGGLE = false;
}
@@ -292,8 +291,7 @@ public class CachedTreeMapTest {
// move version dir to base dir, to simulate the older format
Path versionPath = new Path(map.getLatestVersion());
Path tmpVersionPath = new Path(versionPath.getParent().getParent(), versionPath.getName());
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(versionPath.toUri(), conf);
+ FileSystem fs = HadoopUtil.getFileSystem(versionPath);
fs.rename(versionPath, tmpVersionPath);
fs.delete(new Path(baseDir), true);
fs.rename(tmpVersionPath, new Path(baseDir));
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
index ee932ac..19bc021 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.source.ReadableTable;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
index 847f4bf..8c6b5f5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.StringSplitter;
import org.apache.kylin.source.ReadableTable.TableReader;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
deleted file mode 100644
index 7665350..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.io.Writable;
-import org.apache.kylin.common.KylinConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class HadoopUtil {
- @SuppressWarnings("unused")
- private static final Logger logger = LoggerFactory.getLogger(HadoopUtil.class);
- private static final transient ThreadLocal<Configuration> hadoopConfig = new ThreadLocal<>();
-
- public static void setCurrentConfiguration(Configuration conf) {
- hadoopConfig.set(conf);
- }
-
- public static Configuration getCurrentConfiguration() {
- if (hadoopConfig.get() == null) {
- Configuration conf = healSickConfig(new Configuration());
- // do not cache this conf, or will affect following mr jobs
- return conf;
- }
- Configuration conf = hadoopConfig.get();
- return conf;
- }
-
- private static Configuration healSickConfig(Configuration conf) {
- // why we have this hard code?
- conf.set(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, "8");
-
- // https://issues.apache.org/jira/browse/KYLIN-953
- if (StringUtils.isBlank(conf.get("hadoop.tmp.dir"))) {
- conf.set("hadoop.tmp.dir", "/tmp");
- }
- if (StringUtils.isBlank(conf.get("hbase.fs.tmp.dir"))) {
- conf.set("hbase.fs.tmp.dir", "/tmp");
- }
- return conf;
- }
-
- public static FileSystem getWorkingFileSystem() throws IOException {
- return getFileSystem(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
- }
-
- public static FileSystem getWorkingFileSystem(Configuration conf) throws IOException {
- Path workingPath = new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
- return getFileSystem(workingPath, conf);
- }
-
- public static FileSystem getFileSystem(String path) throws IOException {
- return getFileSystem(new Path(makeURI(path)));
- }
-
- public static FileSystem getFileSystem(Path path) throws IOException {
- Configuration conf = getCurrentConfiguration();
- return getFileSystem(path, conf);
- }
-
- public static FileSystem getFileSystem(Path path, Configuration conf) {
- if (StringUtils.isBlank(path.toUri().getScheme()))
- throw new IllegalArgumentException("Path must be qualified: " + path);
-
- try {
- return path.getFileSystem(conf);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public static URI makeURI(String filePath) {
- try {
- return new URI(fixWindowsPath(filePath));
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException("Cannot create FileSystem from URI: " + filePath, e);
- }
- }
-
- public static String fixWindowsPath(String path) {
- // fix windows path
- if (path.startsWith("file://") && !path.startsWith("file:///") && path.contains(":\\")) {
- path = path.replace("file://", "file:///");
- }
- if (path.startsWith("file:///")) {
- path = path.replace('\\', '/');
- }
- return path;
- }
-
- /**
- * @param table the identifier of hive table, in format <db_name>.<table_name>
- * @return a string array with 2 elements: {"db_name", "table_name"}
- */
- public static String[] parseHiveTableName(String table) {
- int cut = table.indexOf('.');
- String database = cut >= 0 ? table.substring(0, cut).trim() : "DEFAULT";
- String tableName = cut >= 0 ? table.substring(cut + 1).trim() : table.trim();
-
- return new String[] { database, tableName };
- }
-
- public static void deletePath(Configuration conf, Path path) throws IOException {
- FileSystem fs = FileSystem.get(path.toUri(), conf);
- if (fs.exists(path)) {
- fs.delete(path, true);
- }
- }
-
- public static byte[] toBytes(Writable writable) {
- try {
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(bout);
- writable.write(out);
- out.close();
- bout.close();
- return bout.toByteArray();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
index 2b564e9..1595bdd 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
index cb2d7a7..d428757 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
index 62c309a..d3f5cdc 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
@@ -24,6 +24,7 @@ import java.util.Comparator;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.dict.ByteComparator;
import org.apache.kylin.dict.StringBytesConverter;
import org.apache.kylin.metadata.datatype.DataType;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 1540acd..567c1d0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -59,12 +59,12 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.common.util.StringSplitter;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.metadata.model.TableDesc;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index 4011915..dbe4554 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -43,6 +43,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.SumHelper;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -52,7 +53,6 @@ import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.kv.CubeDimEncMap;
import org.apache.kylin.cube.kv.RowKeyEncoder;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.measure.hllc.HLLCounter;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.FunctionDesc;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index be11e39..f887c4c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.constant.JobStepStatusEnum;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index b27d722..95d8cb1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -29,10 +29,10 @@ import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.cli.DictionaryGeneratorCLI;
import org.apache.kylin.dict.DictionaryProvider;
import org.apache.kylin.dict.DistinctColumnValuesProvider;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.SortedColumnDFSFile;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.metadata.model.TblColRef;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 3c7f951..711d991 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -38,12 +38,12 @@ import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dict.DictionaryGenerator;
import org.apache.kylin.dict.IDictionaryBuilder;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.KylinReducer;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index 1612866..7706bac 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -30,12 +30,12 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.ByteArrayWritable;
import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 54666d0..04d8231 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -37,10 +37,10 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.CubeStatsWriter;
import org.apache.kylin.job.exception.ExecuteException;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
index ee8da6b..a2efe04 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.engine.mr.KylinMapper;
/**
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index 6120270..2671042 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -27,9 +27,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.CubingJob.AlgorithmEnum;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index dcc9190..dc80399 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -24,11 +24,11 @@ import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
index f6f790e..c971cef 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
@@ -26,7 +26,7 @@ import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.engine.mr.common.CubeStatsWriter;
import org.apache.kylin.measure.hllc.HLLCounter;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapperPerformanceTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapperPerformanceTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapperPerformanceTest.java
index c1f50a8..9e03493 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapperPerformanceTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapperPerformanceTest.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.common.util.HadoopUtil;
import org.junit.Ignore;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java
index 97b1ef2..2e2ebf9 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java
@@ -26,8 +26,8 @@ import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java
index 10ebd2b..989ed72 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java
@@ -25,8 +25,8 @@ import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 3a664fc..d6790aa 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -31,6 +31,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
@@ -45,7 +46,6 @@ import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.common.BaseCuboidBuilder;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java
index 188a97a..458703a 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java
@@ -27,6 +27,7 @@ import java.io.PrintWriter;
import java.util.Set;
import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.cube.model.CubeDesc;
@@ -34,7 +35,6 @@ import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.dict.DistinctColumnValuesProvider;
import org.apache.kylin.engine.mr.DFSFileTable;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.ReadableTable;
import org.junit.After;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 62978db..e02bf19 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -47,7 +48,6 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.job.DeployUtil;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
index 28cdb67..8859329 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
@@ -29,7 +29,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.common.util.HadoopUtil;
import org.dbunit.database.DatabaseConnection;
import org.dbunit.database.IDatabaseConnection;
import org.dbunit.dataset.ITable;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java
index 605b79e..01a71bf 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.common.util.HadoopUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index 0ced9ad..407ee2e 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -23,8 +23,8 @@ import java.util.List;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.rest.exception.BadRequestException;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java b/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java
index 7dd7b0c..614f0c9 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java
@@ -22,8 +22,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.rest.constant.Constant;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index e3df60a..d4cb854 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -30,7 +30,7 @@ import java.util.UUID;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index bd53f9a..d665dc7 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -31,12 +31,12 @@ import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.BufferedLogger;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.HiveCmdBuilder;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
index 286ffac..87edfe4 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
@@ -26,8 +26,8 @@ import java.util.Set;
import java.util.UUID;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.TableDesc;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 2f4fded..c7987f2 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -31,10 +31,10 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.BatchConstants;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
index cbf81b6..335bfe7 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -47,7 +47,7 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.StorageException;
-import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.common.util.HadoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 2a12984..6217350 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -48,7 +48,7 @@ import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.common.util.HadoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
index fbe64d9..eacff9f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java
@@ -32,8 +32,8 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.HiveCmdBuilder;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
index 89baf95..4c0747e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
@@ -25,7 +25,7 @@ import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.exception.ExecuteException;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
index 1d66d3e..5e6ad34 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
@@ -25,10 +25,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.steps.KVGTRecordWriter;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
index 31864f6..68c0a39 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
@@ -47,6 +47,7 @@ import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.restclient.RestClient;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -55,7 +56,6 @@ import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.dict.lookup.SnapshotManager;
import org.apache.kylin.dict.lookup.SnapshotTable;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.metadata.cachesync.Broadcaster;
import org.apache.kylin.metadata.model.DataModelDesc;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index d1a74ad..62af2c9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -44,13 +44,13 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.HiveCmdBuilder;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CreateHTableTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CreateHTableTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CreateHTableTest.java
index f994886..71079c7 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CreateHTableTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CreateHTableTest.java
@@ -23,8 +23,8 @@ import static org.junit.Assert.assertEquals;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
index 77b18e2..f36a62c 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
@@ -29,10 +29,10 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.measure.MeasureCodec;
import org.junit.After;
import org.junit.Before;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
index 70e1ac7..1d75cfb 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
@@ -25,8 +25,8 @@ import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
index 5269195..c8bff89 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
@@ -47,6 +47,7 @@ import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.restclient.RestClient;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -55,7 +56,6 @@ import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.dict.lookup.SnapshotManager;
import org.apache.kylin.dict.lookup.SnapshotTable;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.metadata.cachesync.Broadcaster;
import org.apache.kylin.metadata.model.DataModelDesc;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
index 1050bbe..ea19885 100644
--- a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
@@ -37,8 +37,8 @@ import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
index b3e2ec0..b48c076 100644
--- a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
+++ b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
@@ -45,11 +45,11 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eddb695b/tool/src/main/java/org/apache/kylin/tool/util/ToolUtil.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/util/ToolUtil.java b/tool/src/main/java/org/apache/kylin/tool/util/ToolUtil.java
index 1312ca4..c41d6a8 100644
--- a/tool/src/main/java/org/apache/kylin/tool/util/ToolUtil.java
+++ b/tool/src/main/java/org/apache/kylin/tool/util/ToolUtil.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.storage.hbase.HBaseConnection;
import com.google.common.collect.Maps;
[08/10] kylin git commit: KYLIN-2394 Upgrade Calcite to 1.11 and
Avatica to 1.9.0
Posted by bi...@apache.org.
http://git-wip-us.apache.org/repos/asf/kylin/blob/dfb5fac9/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index cd784f4..874ead6 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -66,13 +66,15 @@
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
+
<artifactSet>
- <!-- jackson is already packaged into calcite-avatica.jar. To avoid including jackson
- twice, we include calcite-avatica which has jackson and exclude jackson. -->
<excludes>
- <exclude>com.fasterxml.jackson.core:*</exclude>
+ <exclude>com.google.protobuf:*</exclude>
+ <exclude>commons-logging:*</exclude>
+ <exclude>commons-codec:*</exclude>
</excludes>
</artifactSet>
+
<relocations>
<relocation>
<pattern>org.apache.calcite</pattern>
@@ -86,10 +88,6 @@
<pattern>org.apache.http</pattern>
<shadedPattern>${shadeBase}.org.apache.http</shadedPattern>
</relocation>
- <relocation>
- <pattern>org.apache.commons</pattern>
- <shadedPattern>${shadeBase}.org.apache.commons</shadedPattern>
- </relocation>
</relocations>
<filters>
<filter>
http://git-wip-us.apache.org/repos/asf/kylin/blob/dfb5fac9/jdbc/src/main/java/org/apache/kylin/jdbc/KylinMeta.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinMeta.java b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinMeta.java
index a1b9aef..8e69e68 100644
--- a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinMeta.java
+++ b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinMeta.java
@@ -209,7 +209,7 @@ public class KylinMeta extends MetaImpl {
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
- columns.add(columnMetaData(name, index, field.getType()));
+ columns.add(columnMetaData(name, index, field.getType(), true));
fields.add(field);
fieldNames.add(fieldName);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/dfb5fac9/kylin-it/pom.xml
----------------------------------------------------------------------
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index 080558b..80af108 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -43,6 +43,12 @@
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>atopcalcite</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>avatica-core</artifactId>
+ <groupId>org.apache.calcite.avatica</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
@@ -74,14 +80,6 @@
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-query</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.calcite</groupId>
- <artifactId>calcite-linq4j</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- </dependency>
<!-- Env & Test -->
http://git-wip-us.apache.org/repos/asf/kylin/blob/dfb5fac9/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java b/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java
index df6eb2f..05f615f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java
@@ -21,6 +21,7 @@ package org.apache.kylin.jdbc;
import java.io.File;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
+import java.sql.Driver;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
http://git-wip-us.apache.org/repos/asf/kylin/blob/dfb5fac9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ff4c4e8..57b7752 100644
--- a/pom.xml
+++ b/pom.xml
@@ -112,8 +112,8 @@
<aspectj.version>1.8.9</aspectj.version>
<!-- Calcite Version -->
- <calcite.version>1.10.0</calcite.version>
- <avatica.version>1.8.0</avatica.version>
+ <calcite.version>1.11.0</calcite.version>
+ <avatica.version>1.9.0</avatica.version>
<!-- Sonar -->
<sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
@@ -450,15 +450,9 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.apache.calcite</groupId>
- <artifactId>calcite-linq4j</artifactId>
- <version>${calcite.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica</artifactId>
<version>${avatica.version}</version>
-
</dependency>
<!-- Workaround for hive 0.14 avatica dependency -->
<dependency>
http://git-wip-us.apache.org/repos/asf/kylin/blob/dfb5fac9/query/pom.xml
----------------------------------------------------------------------
diff --git a/query/pom.xml b/query/pom.xml
index 1dc05d1..6ab74a7 100644
--- a/query/pom.xml
+++ b/query/pom.xml
@@ -36,16 +36,18 @@
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>atopcalcite</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.calcite.avatica</groupId>
+ <artifactId>avatica-core</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-core-storage</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.calcite</groupId>
- <artifactId>calcite-core</artifactId>
- </dependency>
- <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
[04/10] kylin git commit: Revert KYLIN-2361,
Downgrade to Tomcat 7.0.69
Posted by bi...@apache.org.
Revert KYLIN-2361, Downgrade to Tomcat 7.0.69
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/51c65710
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/51c65710
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/51c65710
Branch: refs/heads/KYLIN-2394
Commit: 51c657108d4dae221a1d4d090ba68a1d0948e8f7
Parents: 131a3f3
Author: Billy Liu <bi...@apache.org>
Authored: Wed Jan 18 17:15:11 2017 +0800
Committer: Billy Liu <bi...@apache.org>
Committed: Wed Jan 18 17:15:11 2017 +0800
----------------------------------------------------------------------
build/script/download-tomcat.sh | 12 ++++++------
pom.xml | 2 +-
.../java/org/apache/kylin/rest/DebugTomcat.java | 16 +++++++---------
.../kylin/ext/CustomizedWebappClassloader.java | 6 +++---
4 files changed, 17 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/51c65710/build/script/download-tomcat.sh
----------------------------------------------------------------------
diff --git a/build/script/download-tomcat.sh b/build/script/download-tomcat.sh
index 403d87b..b3aa509 100755
--- a/build/script/download-tomcat.sh
+++ b/build/script/download-tomcat.sh
@@ -27,25 +27,25 @@ if [[ `uname -a` =~ "Darwin" ]]; then
alias md5cmd="md5 -q"
fi
-tomcat_pkg_version="8.5.9"
-tomcat_pkg_md5="b41270a64b7774c964e4bec813eea2ed"
+tomcat_pkg_version="7.0.69"
+tomcat_pkg_md5="10a071e5169a1a8b14ff35a0ad181052"
if [ ! -f "build/apache-tomcat-${tomcat_pkg_version}.tar.gz" ]
then
echo "no binary file found"
- wget --directory-prefix=build/ http://archive.apache.org/dist/tomcat/tomcat-8/v${tomcat_pkg_version}/bin/apache-tomcat-${tomcat_pkg_version}.tar.gz || echo "Download tomcat failed"
+ wget --directory-prefix=build/ http://archive.apache.org/dist/tomcat/tomcat-7/v${tomcat_pkg_version}/bin/apache-tomcat-${tomcat_pkg_version}.tar.gz || echo "Download tomcat failed"
else
if [ `md5cmd build/apache-tomcat-${tomcat_pkg_version}.tar.gz | awk '{print $1}'` != "${tomcat_pkg_md5}" ]
then
echo "md5 check failed"
rm build/apache-tomcat-${tomcat_pkg_version}.tar.gz
- wget --directory-prefix=build/ http://archive.apache.org/dist/tomcat/tomcat-8/v${tomcat_pkg_version}/bin/apache-tomcat-${tomcat_pkg_version}.tar.gz || echo "download tomcat failed"
+ wget --directory-prefix=build/ http://archive.apache.org/dist/tomcat/tomcat-7/v${tomcat_pkg_version}/bin/apache-tomcat-${tomcat_pkg_version}.tar.gz || echo "download tomcat failed"
fi
fi
unalias md5cmd
-tar -zxvf build/apache-tomcat-8.5.9.tar.gz -C build/
-mv build/apache-tomcat-8.5.9 build/tomcat
+tar -zxvf build/apache-tomcat-${tomcat_pkg_version}.tar.gz -C build/
+mv build/apache-tomcat-${tomcat_pkg_version} build/tomcat
rm -rf build/tomcat/webapps/*
mv build/tomcat/conf/server.xml build/tomcat/conf/server.xml.bak
http://git-wip-us.apache.org/repos/asf/kylin/blob/51c65710/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e8ccb6c..ff4c4e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -101,7 +101,7 @@
<cglib.version>3.2.4</cglib.version>
<supercsv.version>2.4.0</supercsv.version>
<cors.version>2.5</cors.version>
- <tomcat.version>8.5.9</tomcat.version>
+ <tomcat.version>7.0.69</tomcat.version>
<t-digest.version>3.1</t-digest.version>
<!-- REST Service -->
http://git-wip-us.apache.org/repos/asf/kylin/blob/51c65710/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
index 2cac82e..59b7a28 100644
--- a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
+++ b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
@@ -21,13 +21,11 @@ package org.apache.kylin.rest;
import org.apache.catalina.Context;
import org.apache.catalina.core.AprLifecycleListener;
import org.apache.catalina.core.StandardServer;
+import org.apache.catalina.deploy.ErrorPage;
import org.apache.catalina.startup.Tomcat;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.util.Shell;
import org.apache.kylin.common.KylinConfig;
-import org.apache.tomcat.JarScanFilter;
-import org.apache.tomcat.JarScanType;
-import org.apache.tomcat.util.descriptor.web.ErrorPage;
import java.io.File;
import java.lang.reflect.Field;
@@ -129,12 +127,12 @@ public class DebugTomcat {
notFound.setLocation("/index.html");
webContext.addErrorPage(notFound);
webContext.addWelcomeFile("index.html");
- webContext.getJarScanner().setJarScanFilter(new JarScanFilter() {
- @Override
- public boolean check(JarScanType arg0, String arg1) {
- return false;
- }
- });
+// webContext.getJarScanner().setJarScanFilter(new JarScanFilter() {
+// @Override
+// public boolean check(JarScanType arg0, String arg1) {
+// return false;
+// }
+// });
// tomcat start
tomcat.start();
http://git-wip-us.apache.org/repos/asf/kylin/blob/51c65710/tomcat-ext/src/main/java/org/apache/kylin/ext/CustomizedWebappClassloader.java
----------------------------------------------------------------------
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/CustomizedWebappClassloader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/CustomizedWebappClassloader.java
index 23bde2c..816601f 100644
--- a/tomcat-ext/src/main/java/org/apache/kylin/ext/CustomizedWebappClassloader.java
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/CustomizedWebappClassloader.java
@@ -45,7 +45,7 @@ public class CustomizedWebappClassloader extends ParallelWebappClassLoader {
* @param name class name
* @return true if the class should be filtered
*/
- protected boolean filter(String name, boolean isClassName) {
+ protected boolean filter(String name) {
if (name == null)
return false;
@@ -62,7 +62,7 @@ public class CustomizedWebappClassloader extends ParallelWebappClassLoader {
return true;
}
- return super.filter(name, isClassName);
-
+ //return super.filter(name, isClassName);
+ return false;
}
}
[07/10] kylin git commit: KYLIN-2410 remove force qualified path in
HadoopUtil.getFileSystem
Posted by bi...@apache.org.
KYLIN-2410 remove force qualified path in HadoopUtil.getFileSystem
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/61833d95
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/61833d95
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/61833d95
Branch: refs/heads/KYLIN-2394
Commit: 61833d952daa3ed7e3938f6e310caca5a52a4974
Parents: eddb695
Author: Yang Li <li...@apache.org>
Authored: Thu Jan 19 10:35:50 2017 +0800
Committer: Yang Li <li...@apache.org>
Committed: Thu Jan 19 12:42:44 2017 +0800
----------------------------------------------------------------------
.../src/main/java/org/apache/kylin/common/util/HadoopUtil.java | 3 ---
1 file changed, 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/61833d95/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 390c209..bdc4c3e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -82,9 +82,6 @@ public class HadoopUtil {
}
public static FileSystem getFileSystem(Path path, Configuration conf) {
- if (StringUtils.isBlank(path.toUri().getScheme()))
- throw new IllegalArgumentException("Path must be qualified: " + path);
-
try {
return path.getFileSystem(conf);
} catch (IOException e) {
[05/10] kylin git commit: KYLIN-2348 let choose model freely for
sub-queries
Posted by bi...@apache.org.
KYLIN-2348 let choose model freely for sub-queries
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d52bb8bc
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d52bb8bc
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d52bb8bc
Branch: refs/heads/KYLIN-2394
Commit: d52bb8bc05bd530362fe0970b8e8a85f8b2289bc
Parents: 51c6571
Author: Li Yang <li...@apache.org>
Authored: Wed Jan 18 22:46:24 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Wed Jan 18 22:46:24 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/query/relnode/OLAPTableScan.java | 2 ++
.../apache/kylin/query/routing/ModelChooser.java | 16 ++++++++--------
2 files changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/d52bb8bc/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java
index 452170a..f7877be 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java
@@ -53,6 +53,7 @@ import org.apache.calcite.rel.rules.JoinPushExpressionsRule;
import org.apache.calcite.rel.rules.JoinPushThroughJoinRule;
import org.apache.calcite.rel.rules.JoinUnionTransposeRule;
import org.apache.calcite.rel.rules.ReduceExpressionsRule;
+import org.apache.calcite.rel.rules.SemiJoinRule;
import org.apache.calcite.rel.rules.SortJoinTransposeRule;
import org.apache.calcite.rel.rules.SortUnionTransposeRule;
import org.apache.calcite.rel.type.RelDataType;
@@ -174,6 +175,7 @@ public class OLAPTableScan extends TableScan implements OLAPRel, EnumerableRel {
planner.removeRule(JoinUnionTransposeRule.RIGHT_UNION);
planner.removeRule(AggregateUnionTransposeRule.INSTANCE);
planner.removeRule(DateRangeRules.FILTER_INSTANCE);
+ planner.removeRule(SemiJoinRule.INSTANCE);
// distinct count will be split into a separated query that is joined with the left query
planner.removeRule(AggregateExpandDistinctAggregatesRule.INSTANCE);
http://git-wip-us.apache.org/repos/asf/kylin/blob/d52bb8bc/query/src/main/java/org/apache/kylin/query/routing/ModelChooser.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/ModelChooser.java b/query/src/main/java/org/apache/kylin/query/routing/ModelChooser.java
index 2517dc5..f979f7b 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/ModelChooser.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/ModelChooser.java
@@ -53,17 +53,17 @@ public class ModelChooser {
IdentityHashMap<OLAPContext, Set<IRealization>> candidates = new IdentityHashMap<>();
// attempt one model for all contexts
- Set<IRealization> reals = attemptSelectModel(contexts);
- if (reals != null) {
- for (OLAPContext ctx : contexts) {
- candidates.put(ctx, reals);
- }
- return candidates;
- }
+ // Set<IRealization> reals = attemptSelectModel(contexts);
+ // if (reals != null) {
+ // for (OLAPContext ctx : contexts) {
+ // candidates.put(ctx, reals);
+ // }
+ // return candidates;
+ // }
// try different model for different context
for (OLAPContext ctx : contexts) {
- reals = attemptSelectModel(ImmutableList.of(ctx));
+ Set<IRealization> reals = attemptSelectModel(ImmutableList.of(ctx));
if (reals == null)
throw new NoRealizationFoundException("No model found for" + toErrorMsg(ctx));
[02/10] kylin git commit: Opt some constant value
Posted by bi...@apache.org.
Opt some constant value
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6e303767
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6e303767
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6e303767
Branch: refs/heads/KYLIN-2394
Commit: 6e30376752d95bafa4a1773d39c2f798f75c35a7
Parents: 837bd82
Author: xiefan46 <95...@qq.com>
Authored: Tue Jan 17 10:34:20 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Jan 18 16:28:05 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/dict/TrieDictionaryForest.java | 77 +++++++++++++++-----
1 file changed, 57 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/6e303767/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java
index 04292d2..1023892 100755
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java
@@ -51,6 +51,14 @@ public class TrieDictionaryForest<T> extends CacheDictionary<T> {
private ArrayList<ByteArray> maxValue;
+ private int minId;
+
+ private int maxId;
+
+ private int sizeOfId;
+
+ private int sizeOfValue;
+
public TrieDictionaryForest() { // default constructor for Writable interface
}
@@ -65,42 +73,28 @@ public class TrieDictionaryForest<T> extends CacheDictionary<T> {
this.accuOffset = accuOffset;
this.bytesConvert = bytesConverter;
this.baseId = baseId;
- initMaxValue();
+ initConstantValue();
initForestCache();
}
@Override
public int getMinId() {
- if (trees.isEmpty())
- return baseId;
- return trees.get(0).getMinId() + baseId;
+ return this.minId;
}
@Override
public int getMaxId() {
- if (trees.isEmpty())
- return baseId - 1;
- int index = trees.size() - 1;
- int id = accuOffset.get(index) + trees.get(index).getMaxId() + baseId;
- return id;
+ return this.maxId;
}
@Override
public int getSizeOfId() {
- if (trees.isEmpty())
- return 1;
- int maxOffset = accuOffset.get(accuOffset.size() - 1);
- TrieDictionary<T> lastTree = trees.get(trees.size() - 1);
- int sizeOfId = BytesUtil.sizeForValue(baseId + maxOffset + lastTree.getMaxId() + 1L);
- return sizeOfId;
+ return this.sizeOfId;
}
@Override
public int getSizeOfValue() {
- int maxValue = 0;
- for (TrieDictionary<T> tree : trees)
- maxValue = Math.max(maxValue, tree.getSizeOfValue());
- return maxValue;
+ return this.sizeOfValue;
}
@Override
@@ -340,7 +334,16 @@ public class TrieDictionaryForest<T> extends CacheDictionary<T> {
}
}
- private void initMaxValue() throws IllegalStateException {
+ private void initConstantValue() throws IllegalStateException {
+ initMaxValueForEachTrie();
+ initMaxId();
+ initMinId();
+ initSizeOfId();
+ initSizeOfValue();
+ }
+
+ private void initMaxValueForEachTrie(){
+ //init max value
this.maxValue = new ArrayList<>();
if (this.trees == null || trees.isEmpty()) {
return;
@@ -353,6 +356,40 @@ public class TrieDictionaryForest<T> extends CacheDictionary<T> {
}
}
+ private void initMaxId(){
+ if (trees.isEmpty()) {
+ this.maxId = baseId - 1;
+ return;
+ }
+ int index = trees.size() - 1;
+ this.maxId = accuOffset.get(index) + trees.get(index).getMaxId() + baseId;
+ }
+
+ private void initMinId(){
+ if (trees.isEmpty()) {
+ this.minId = baseId;
+ return;
+ }
+ this.minId = trees.get(0).getMinId() + baseId;
+ }
+
+ private void initSizeOfId(){
+ if (trees.isEmpty()){
+ this.sizeOfId = 1;
+ return;
+ }
+ int maxOffset = accuOffset.get(accuOffset.size() - 1);
+ TrieDictionary<T> lastTree = trees.get(trees.size() - 1);
+ this.sizeOfId = BytesUtil.sizeForValue(baseId + maxOffset + lastTree.getMaxId() + 1L);
+ }
+
+ private void initSizeOfValue(){
+ int maxValue = 0;
+ for (TrieDictionary<T> tree : trees)
+ maxValue = Math.max(maxValue, tree.getSizeOfValue());
+ this.sizeOfValue = maxValue;
+ }
+
private void initForestCache() {
enableCache();
for (TrieDictionary<T> tree : trees) { //disable duplicate cache
[09/10] kylin git commit: KYLIN-2394 Upgrade Calcite to 1.11 and
Avatica to 1.9.0
Posted by bi...@apache.org.
http://git-wip-us.apache.org/repos/asf/kylin/blob/dfb5fac9/atopcalcite/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
----------------------------------------------------------------------
diff --git a/atopcalcite/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java b/atopcalcite/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index e0782ce..171e299 100644
--- a/atopcalcite/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++ b/atopcalcite/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -16,24 +16,6 @@
*/
package org.apache.calcite.sql2rel;
-import static org.apache.calcite.sql.SqlUtil.stripAs;
-import static org.apache.calcite.util.Static.RESOURCE;
-
-import java.lang.reflect.Type;
-import java.math.BigDecimal;
-import java.util.AbstractList;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
import org.apache.calcite.avatica.util.Spaces;
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.plan.Convention;
@@ -85,7 +67,6 @@ import org.apache.calcite.rel.stream.Delta;
import org.apache.calcite.rel.stream.LogicalDelta;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
@@ -116,6 +97,7 @@ import org.apache.calcite.sql.SqlCallBinding;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlDelete;
import org.apache.calcite.sql.SqlDynamicParam;
+import org.apache.calcite.sql.SqlExplainFormat;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlIdentifier;
@@ -177,17 +159,37 @@ import org.apache.calcite.util.NumberUtil;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
import org.apache.calcite.util.trace.CalciteTrace;
-import org.slf4j.Logger;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+
+import org.slf4j.Logger;
+
+import java.lang.reflect.Type;
+import java.math.BigDecimal;
+import java.util.AbstractList;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.apache.calcite.sql.SqlUtil.stripAs;
+import static org.apache.calcite.util.Static.RESOURCE;
/*
* The code has synced with calcite. Hope one day, we could remove the hardcode override point.
@@ -209,42 +211,56 @@ import com.google.common.collect.Sets;
public class SqlToRelConverter {
//~ Static fields/initializers ---------------------------------------------
+ protected static final Logger SQL2REL_LOGGER =
+ CalciteTrace.getSqlToRelTracer();
+
+ private static final BigDecimal TWO = BigDecimal.valueOf(2L);
+
/** Size of the smallest IN list that will be converted to a semijoin to a
* static table. */
- public static final int DEFAULT_IN_SUBQUERY_THRESHOLD = 20;
- protected static final Logger SQL2REL_LOGGER = CalciteTrace.getSqlToRelTracer();
- private static final BigDecimal TWO = BigDecimal.valueOf(2L);
+ public static final int DEFAULT_IN_SUB_QUERY_THRESHOLD = 20;
+
+ @Deprecated // to be removed before 2.0
+ public static final int DEFAULT_IN_SUBQUERY_THRESHOLD =
+ DEFAULT_IN_SUB_QUERY_THRESHOLD;
//~ Instance fields --------------------------------------------------------
- public final SqlToRelConverter.Config config;
- public final RelOptTable.ViewExpander viewExpander;
+
protected final SqlValidator validator;
protected final RexBuilder rexBuilder;
protected final Prepare.CatalogReader catalogReader;
protected final RelOptCluster cluster;
+ private DefaultValueFactory defaultValueFactory;
+ private SubQueryConverter subQueryConverter;
protected final List<RelNode> leaves = new ArrayList<>();
- protected final RelDataTypeFactory typeFactory;
private final List<SqlDynamicParam> dynamicParamSqlNodes = new ArrayList<>();
private final SqlOperatorTable opTab;
+ protected final RelDataTypeFactory typeFactory;
private final SqlNodeToRexConverter exprConverter;
+ private int explainParamCount;
+ public final SqlToRelConverter.Config config;
+
/**
- * Fields used in name resolution for correlated subqueries.
+ * Fields used in name resolution for correlated sub-queries.
*/
- private final Map<CorrelationId, DeferredLookup> mapCorrelToDeferred = new HashMap<>();
+ private final Map<CorrelationId, DeferredLookup> mapCorrelToDeferred =
+ new HashMap<>();
+
/**
* Stack of names of datasets requested by the <code>
* TABLE(SAMPLE(<datasetName>, <query>))</code> construct.
*/
private final Deque<String> datasetStack = new ArrayDeque<>();
+
/**
- * Mapping of non-correlated subqueries that have been converted to their
- * equivalent constants. Used to avoid re-evaluating the subquery if it's
+ * Mapping of non-correlated sub-queries that have been converted to their
+ * equivalent constants. Used to avoid re-evaluating the sub-query if it's
* already been evaluated.
*/
- private final Map<SqlNode, RexNode> mapConvertedNonCorrSubqs = new HashMap<>();
- private DefaultValueFactory defaultValueFactory;
- private SubqueryConverter subqueryConverter;
- private int explainParamCount;
+ private final Map<SqlNode, RexNode> mapConvertedNonCorrSubqs =
+ new HashMap<>();
+
+ public final RelOptTable.ViewExpander viewExpander;
//~ Constructors -----------------------------------------------------------
/**
@@ -258,23 +274,46 @@ public class SqlToRelConverter {
* @param convertletTable Expression converter
*/
@Deprecated // to be removed before 2.0
- public SqlToRelConverter(RelOptTable.ViewExpander viewExpander, SqlValidator validator, Prepare.CatalogReader catalogReader, RelOptPlanner planner, RexBuilder rexBuilder, SqlRexConvertletTable convertletTable) {
- this(viewExpander, validator, catalogReader, RelOptCluster.create(planner, rexBuilder), convertletTable, Config.DEFAULT);
+ public SqlToRelConverter(
+ RelOptTable.ViewExpander viewExpander,
+ SqlValidator validator,
+ Prepare.CatalogReader catalogReader,
+ RelOptPlanner planner,
+ RexBuilder rexBuilder,
+ SqlRexConvertletTable convertletTable) {
+ this(viewExpander, validator, catalogReader,
+ RelOptCluster.create(planner, rexBuilder), convertletTable,
+ Config.DEFAULT);
}
@Deprecated // to be removed before 2.0
- public SqlToRelConverter(RelOptTable.ViewExpander viewExpander, SqlValidator validator, Prepare.CatalogReader catalogReader, RelOptCluster cluster, SqlRexConvertletTable convertletTable) {
- this(viewExpander, validator, catalogReader, cluster, convertletTable, Config.DEFAULT);
+ public SqlToRelConverter(
+ RelOptTable.ViewExpander viewExpander,
+ SqlValidator validator,
+ Prepare.CatalogReader catalogReader,
+ RelOptCluster cluster,
+ SqlRexConvertletTable convertletTable) {
+ this(viewExpander, validator, catalogReader, cluster, convertletTable,
+ Config.DEFAULT);
}
/* Creates a converter. */
- public SqlToRelConverter(RelOptTable.ViewExpander viewExpander, SqlValidator validator, Prepare.CatalogReader catalogReader, RelOptCluster cluster, SqlRexConvertletTable convertletTable, Config config) {
+ public SqlToRelConverter(
+ RelOptTable.ViewExpander viewExpander,
+ SqlValidator validator,
+ Prepare.CatalogReader catalogReader,
+ RelOptCluster cluster,
+ SqlRexConvertletTable convertletTable,
+ Config config) {
this.viewExpander = viewExpander;
- this.opTab = (validator == null) ? SqlStdOperatorTable.instance() : validator.getOperatorTable();
+ this.opTab =
+ (validator
+ == null) ? SqlStdOperatorTable.instance()
+ : validator.getOperatorTable();
this.validator = validator;
this.catalogReader = catalogReader;
this.defaultValueFactory = new NullDefaultValueFactory();
- this.subqueryConverter = new NoOpSubqueryConverter();
+ this.subQueryConverter = new NoOpSubQueryConverter();
this.rexBuilder = cluster.getRexBuilder();
this.typeFactory = rexBuilder.getTypeFactory();
this.cluster = Preconditions.checkNotNull(cluster);
@@ -285,157 +324,6 @@ public class SqlToRelConverter {
//~ Methods ----------------------------------------------------------------
- private static boolean isStream(SqlNode query) {
- return query instanceof SqlSelect && ((SqlSelect) query).isKeywordPresent(SqlSelectKeyword.STREAM);
- }
-
- public static boolean isOrdered(SqlNode query) {
- switch (query.getKind()) {
- case SELECT:
- return ((SqlSelect) query).getOrderList() != null && ((SqlSelect) query).getOrderList().size() > 0;
- case WITH:
- return isOrdered(((SqlWith) query).body);
- case ORDER_BY:
- return ((SqlOrderBy) query).orderList.size() > 0;
- default:
- return false;
- }
- }
-
- /**
- * Returns whether a given node contains a {@link SqlInOperator}.
- *
- * @param node a RexNode tree
- */
- private static boolean containsInOperator(SqlNode node) {
- try {
- SqlVisitor<Void> visitor = new SqlBasicVisitor<Void>() {
- public Void visit(SqlCall call) {
- if (call.getOperator() instanceof SqlInOperator) {
- throw new Util.FoundOne(call);
- }
- return super.visit(call);
- }
- };
- node.accept(visitor);
- return false;
- } catch (Util.FoundOne e) {
- Util.swallow(e, null);
- return true;
- }
- }
-
- /**
- * Push down all the NOT logical operators into any IN/NOT IN operators.
- *
- * @param sqlNode the root node from which to look for NOT operators
- * @return the transformed SqlNode representation with NOT pushed down.
- */
- private static SqlNode pushDownNotForIn(SqlNode sqlNode) {
- if ((sqlNode instanceof SqlCall) && containsInOperator(sqlNode)) {
- SqlCall sqlCall = (SqlCall) sqlNode;
- if ((sqlCall.getOperator() == SqlStdOperatorTable.AND) || (sqlCall.getOperator() == SqlStdOperatorTable.OR)) {
- SqlNode[] sqlOperands = ((SqlBasicCall) sqlCall).operands;
- for (int i = 0; i < sqlOperands.length; i++) {
- sqlOperands[i] = pushDownNotForIn(sqlOperands[i]);
- }
- return sqlNode;
- } else if (sqlCall.getOperator() == SqlStdOperatorTable.NOT) {
- SqlNode childNode = sqlCall.operand(0);
- assert childNode instanceof SqlCall;
- SqlBasicCall childSqlCall = (SqlBasicCall) childNode;
- if (childSqlCall.getOperator() == SqlStdOperatorTable.AND) {
- SqlNode[] andOperands = childSqlCall.getOperands();
- SqlNode[] orOperands = new SqlNode[andOperands.length];
- for (int i = 0; i < orOperands.length; i++) {
- orOperands[i] = SqlStdOperatorTable.NOT.createCall(SqlParserPos.ZERO, andOperands[i]);
- }
- for (int i = 0; i < orOperands.length; i++) {
- orOperands[i] = pushDownNotForIn(orOperands[i]);
- }
- return SqlStdOperatorTable.OR.createCall(SqlParserPos.ZERO, orOperands[0], orOperands[1]);
- } else if (childSqlCall.getOperator() == SqlStdOperatorTable.OR) {
- SqlNode[] orOperands = childSqlCall.getOperands();
- SqlNode[] andOperands = new SqlNode[orOperands.length];
- for (int i = 0; i < andOperands.length; i++) {
- andOperands[i] = SqlStdOperatorTable.NOT.createCall(SqlParserPos.ZERO, orOperands[i]);
- }
- for (int i = 0; i < andOperands.length; i++) {
- andOperands[i] = pushDownNotForIn(andOperands[i]);
- }
- return SqlStdOperatorTable.AND.createCall(SqlParserPos.ZERO, andOperands[0], andOperands[1]);
- } else if (childSqlCall.getOperator() == SqlStdOperatorTable.NOT) {
- SqlNode[] notOperands = childSqlCall.getOperands();
- assert notOperands.length == 1;
- return pushDownNotForIn(notOperands[0]);
- } else if (childSqlCall.getOperator() instanceof SqlInOperator) {
- SqlNode[] inOperands = childSqlCall.getOperands();
- SqlInOperator inOp = (SqlInOperator) childSqlCall.getOperator();
- if (inOp.isNotIn()) {
- return SqlStdOperatorTable.IN.createCall(SqlParserPos.ZERO, inOperands[0], inOperands[1]);
- } else {
- return SqlStdOperatorTable.NOT_IN.createCall(SqlParserPos.ZERO, inOperands[0], inOperands[1]);
- }
- } else {
- // childSqlCall is "leaf" node in a logical expression tree
- // (only considering AND, OR, NOT)
- return sqlNode;
- }
- } else {
- // sqlNode is "leaf" node in a logical expression tree
- // (only considering AND, OR, NOT)
- return sqlNode;
- }
- } else {
- // tree rooted at sqlNode does not contain inOperator
- return sqlNode;
- }
- }
-
- private static boolean containsNullLiteral(SqlNodeList valueList) {
- for (SqlNode node : valueList.getList()) {
- if (node instanceof SqlLiteral) {
- SqlLiteral lit = (SqlLiteral) node;
- if (lit.getValue() == null) {
- return true;
- }
- }
- }
- return false;
- }
-
- private static JoinRelType convertJoinType(JoinType joinType) {
- switch (joinType) {
- case COMMA:
- case INNER:
- case CROSS:
- return JoinRelType.INNER;
- case FULL:
- return JoinRelType.FULL;
- case LEFT:
- return JoinRelType.LEFT;
- case RIGHT:
- return JoinRelType.RIGHT;
- default:
- throw Util.unexpected(joinType);
- }
- }
-
- private static boolean desc(RelFieldCollation.Direction direction) {
- switch (direction) {
- case DESCENDING:
- case STRICTLY_DESCENDING:
- return true;
- default:
- return false;
- }
- }
-
- /** Creates a builder for a {@link Config}. */
- public static ConfigBuilder configBuilder() {
- return new ConfigBuilder();
- }
-
/**
* @return the RelOptCluster in use.
*/
@@ -490,7 +378,7 @@ public class SqlToRelConverter {
}
/**
- * @return mapping of non-correlated subqueries that have been converted to
+ * @return mapping of non-correlated sub-queries that have been converted to
* the constants that they evaluate to
*/
public Map<SqlNode, RexNode> getMapConvertedNonCorrSubqs() {
@@ -498,13 +386,14 @@ public class SqlToRelConverter {
}
/**
- * Adds to the current map of non-correlated converted subqueries the
- * elements from another map that contains non-correlated subqueries that
+ * Adds to the current map of non-correlated converted sub-queries the
+ * elements from another map that contains non-correlated sub-queries that
* have been converted by another SqlToRelConverter.
*
* @param alreadyConvertedNonCorrSubqs the other map
*/
- public void addConvertedNonCorrSubqs(Map<SqlNode, RexNode> alreadyConvertedNonCorrSubqs) {
+ public void addConvertedNonCorrSubqs(
+ Map<SqlNode, RexNode> alreadyConvertedNonCorrSubqs) {
mapConvertedNonCorrSubqs.putAll(alreadyConvertedNonCorrSubqs);
}
@@ -519,13 +408,13 @@ public class SqlToRelConverter {
}
/**
- * Sets a new SubqueryConverter. To have any effect, this must be called
+ * Sets a new SubQueryConverter. To have any effect, this must be called
* before any convert method.
*
- * @param converter new SubqueryConverter
+ * @param converter new SubQueryConverter
*/
- public void setSubqueryConverter(SubqueryConverter converter) {
- subqueryConverter = converter;
+ public void setSubQueryConverter(SubQueryConverter converter) {
+ subQueryConverter = converter;
}
/**
@@ -548,24 +437,42 @@ public class SqlToRelConverter {
// SQL statement is something like an INSERT which has no
// validator type information associated with its result,
// hence the namespace check above.)
- final List<RelDataTypeField> validatedFields = validator.getValidatedNodeType(query).getFieldList();
- final RelDataType validatedRowType = validator.getTypeFactory().createStructType(Pair.right(validatedFields), SqlValidatorUtil.uniquify(Pair.left(validatedFields), catalogReader.isCaseSensitive()));
-
- final List<RelDataTypeField> convertedFields = result.getRowType().getFieldList().subList(0, validatedFields.size());
- final RelDataType convertedRowType = validator.getTypeFactory().createStructType(convertedFields);
-
- if (!RelOptUtil.equal("validated row type", validatedRowType, "converted row type", convertedRowType, Litmus.IGNORE)) {
- throw new AssertionError("Conversion to relational algebra failed to " + "preserve datatypes:\n" + "validated type:\n" + validatedRowType.getFullTypeString() + "\nconverted type:\n" + convertedRowType.getFullTypeString() + "\nrel:\n" + RelOptUtil.toString(result));
+ final List<RelDataTypeField> validatedFields =
+ validator.getValidatedNodeType(query).getFieldList();
+ final RelDataType validatedRowType =
+ validator.getTypeFactory().createStructType(
+ Pair.right(validatedFields),
+ SqlValidatorUtil.uniquify(Pair.left(validatedFields),
+ catalogReader.isCaseSensitive()));
+
+ final List<RelDataTypeField> convertedFields =
+ result.getRowType().getFieldList().subList(0, validatedFields.size());
+ final RelDataType convertedRowType =
+ validator.getTypeFactory().createStructType(convertedFields);
+
+ if (!RelOptUtil.equal("validated row type", validatedRowType,
+ "converted row type", convertedRowType, Litmus.IGNORE)) {
+ throw new AssertionError("Conversion to relational algebra failed to "
+ + "preserve datatypes:\n"
+ + "validated type:\n"
+ + validatedRowType.getFullTypeString()
+ + "\nconverted type:\n"
+ + convertedRowType.getFullTypeString()
+ + "\nrel:\n"
+ + RelOptUtil.toString(result));
}
}
- public RelNode flattenTypes(RelNode rootRel, boolean restructure) {
- RelStructuredTypeFlattener typeFlattener = new RelStructuredTypeFlattener(rexBuilder, createToRelContext());
- return typeFlattener.rewrite(rootRel, restructure);
+ public RelNode flattenTypes(
+ RelNode rootRel,
+ boolean restructure) {
+ RelStructuredTypeFlattener typeFlattener =
+ new RelStructuredTypeFlattener(rexBuilder, createToRelContext(), restructure);
+ return typeFlattener.rewrite(rootRel);
}
/**
- * If subquery is correlated and decorrelation is enabled, performs
+ * If sub-query is correlated and decorrelation is enabled, performs
* decorrelation.
*
* @param query Query
@@ -606,14 +513,21 @@ public class SqlToRelConverter {
// Trim fields that are not used by their consumer.
if (isTrimUnusedFields()) {
final RelFieldTrimmer trimmer = newFieldTrimmer();
- final List<RelCollation> collations = rootRel.getTraitSet().getTraits(RelCollationTraitDef.INSTANCE);
+ final List<RelCollation> collations =
+ rootRel.getTraitSet().getTraits(RelCollationTraitDef.INSTANCE);
rootRel = trimmer.trim(rootRel);
- if (!ordered && collations != null && !collations.isEmpty() && !collations.equals(ImmutableList.of(RelCollations.EMPTY))) {
- final RelTraitSet traitSet = rootRel.getTraitSet().replace(RelCollationTraitDef.INSTANCE, collations);
+ if (!ordered
+ && collations != null
+ && !collations.isEmpty()
+ && !collations.equals(ImmutableList.of(RelCollations.EMPTY))) {
+ final RelTraitSet traitSet = rootRel.getTraitSet()
+ .replace(RelCollationTraitDef.INSTANCE, collations);
rootRel = rootRel.copy(traitSet, rootRel.getInputs());
}
if (SQL2REL_LOGGER.isDebugEnabled()) {
- SQL2REL_LOGGER.debug(RelOptUtil.dumpPlan("Plan after trimming unused fields", rootRel, false, SqlExplainLevel.EXPPLAN_ATTRIBUTES));
+ SQL2REL_LOGGER.debug(
+ RelOptUtil.dumpPlan("Plan after trimming unused fields", rootRel,
+ SqlExplainFormat.TEXT, SqlExplainLevel.EXPPLAN_ATTRIBUTES));
}
}
return rootRel;
@@ -625,7 +539,8 @@ public class SqlToRelConverter {
* @return Field trimmer
*/
protected RelFieldTrimmer newFieldTrimmer() {
- final RelBuilder relBuilder = RelFactories.LOGICAL_BUILDER.create(cluster, null);
+ final RelBuilder relBuilder =
+ RelFactories.LOGICAL_BUILDER.create(cluster, null);
return new RelFieldTrimmer(validator, relBuilder);
}
@@ -640,14 +555,18 @@ public class SqlToRelConverter {
* will become a JDBC result set; <code>false</code> if
* the query will be part of a view.
*/
- public RelRoot convertQuery(SqlNode query, final boolean needsValidation, final boolean top) {
+ public RelRoot convertQuery(
+ SqlNode query,
+ final boolean needsValidation,
+ final boolean top) {
SqlNode origQuery = query; /* OVERRIDE POINT */
-
+
if (needsValidation) {
query = validator.validate(query);
}
- RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(cluster.getMetadataProvider()));
+ RelMetadataQuery.THREAD_PROVIDERS.set(
+ JaninoRelMetadataProvider.of(cluster.getMetadataProvider()));
RelNode result = convertQueryRecursive(query, top, null).rel;
if (top) {
if (isStream(query)) {
@@ -663,18 +582,23 @@ public class SqlToRelConverter {
checkConvertedType(query, result);
if (SQL2REL_LOGGER.isDebugEnabled()) {
- SQL2REL_LOGGER.debug(RelOptUtil.dumpPlan("Plan after converting SqlNode to RelNode", result, false, SqlExplainLevel.EXPPLAN_ATTRIBUTES));
+ SQL2REL_LOGGER.debug(
+ RelOptUtil.dumpPlan("Plan after converting SqlNode to RelNode",
+ result, SqlExplainFormat.TEXT,
+ SqlExplainLevel.EXPPLAN_ATTRIBUTES));
}
final RelDataType validatedRowType = validator.getValidatedNodeType(query);
- return hackSelectStar(origQuery, RelRoot.of(result, validatedRowType, query.getKind()).withCollation(collation));
+ RelRoot origResult = RelRoot.of(result, validatedRowType, query.getKind())
+ .withCollation(collation);
+ return hackSelectStar(origQuery, origResult);
}
/* OVERRIDE POINT */
private RelRoot hackSelectStar(SqlNode query, RelRoot root) {
/*
* Rel tree is like:
- *
+ *
* LogicalSort (optional)
* |- LogicalProject
* |- LogicalFilter (optional)
@@ -690,7 +614,7 @@ public class SqlToRelConverter {
} else {
return root;
}
-
+
RelNode input = rootPrj.getInput();
if (!(//
input.getClass().getSimpleName().equals("OLAPTableScan")//
@@ -700,13 +624,13 @@ public class SqlToRelConverter {
if (rootPrj.getRowType().getFieldCount() < input.getRowType().getFieldCount())
return root;
-
+
RelDataType inType = rootPrj.getRowType();
List<String> inFields = inType.getFieldNames();
List<RexNode> projExp = new ArrayList<>();
List<Pair<Integer, String>> projFields = new ArrayList<>();
- FieldInfoBuilder projTypeBuilder = getCluster().getTypeFactory().builder();
- FieldInfoBuilder validTypeBuilder = getCluster().getTypeFactory().builder();
+ RelDataTypeFactory.FieldInfoBuilder projTypeBuilder = getCluster().getTypeFactory().builder();
+ RelDataTypeFactory.FieldInfoBuilder validTypeBuilder = getCluster().getTypeFactory().builder();
for (int i = 0; i < inFields.size(); i++) {
if (!inFields.get(i).startsWith("_KY_")) {
projExp.add(rootPrj.getProjects().get(i));
@@ -721,15 +645,34 @@ public class SqlToRelConverter {
if (rootSort != null) {
rootSort = (LogicalSort) rootSort.copy(rootSort.getTraitSet(), rootPrj, rootSort.collation, rootSort.offset, rootSort.fetch);
}
-
+
RelDataType validRowType = getCluster().getTypeFactory().createStructType(validTypeBuilder);
root = new RelRoot(rootSort == null ? rootPrj : rootSort, validRowType, root.kind, projFields, root.collation);
-
+
validator.setValidatedNodeType(query, validRowType);
-
+
return root;
}
+ private static boolean isStream(SqlNode query) {
+ return query instanceof SqlSelect
+ && ((SqlSelect) query).isKeywordPresent(SqlSelectKeyword.STREAM);
+ }
+
+ public static boolean isOrdered(SqlNode query) {
+ switch (query.getKind()) {
+ case SELECT:
+ return ((SqlSelect) query).getOrderList() != null
+ && ((SqlSelect) query).getOrderList().size() > 0;
+ case WITH:
+ return isOrdered(((SqlWith) query).body);
+ case ORDER_BY:
+ return ((SqlOrderBy) query).orderList.size() > 0;
+ default:
+ return false;
+ }
+ }
+
private RelCollation requiredCollation(RelNode r) {
if (r instanceof Sort) {
return ((Sort) r).collation;
@@ -756,7 +699,8 @@ public class SqlToRelConverter {
/**
* Factory method for creating translation workspace.
*/
- protected Blackboard createBlackboard(SqlValidatorScope scope, Map<String, RexNode> nameToNodeMap, boolean top) {
+ protected Blackboard createBlackboard(SqlValidatorScope scope,
+ Map<String, RexNode> nameToNodeMap, boolean top) {
return new Blackboard(scope, nameToNodeMap, top);
}
@@ -764,25 +708,45 @@ public class SqlToRelConverter {
* Implementation of {@link #convertSelect(SqlSelect, boolean)};
* derived class may override.
*/
- protected void convertSelectImpl(final Blackboard bb, SqlSelect select) {
- convertFrom(bb, select.getFrom());
- convertWhere(bb, select.getWhere());
+ protected void convertSelectImpl(
+ final Blackboard bb,
+ SqlSelect select) {
+ convertFrom(
+ bb,
+ select.getFrom());
+ convertWhere(
+ bb,
+ select.getWhere());
final List<SqlNode> orderExprList = new ArrayList<>();
final List<RelFieldCollation> collationList = new ArrayList<>();
- gatherOrderExprs(bb, select, select.getOrderList(), orderExprList, collationList);
- final RelCollation collation = cluster.traitSet().canonize(RelCollations.of(collationList));
+ gatherOrderExprs(
+ bb,
+ select,
+ select.getOrderList(),
+ orderExprList,
+ collationList);
+ final RelCollation collation =
+ cluster.traitSet().canonize(RelCollations.of(collationList));
if (validator.isAggregate(select)) {
- convertAgg(bb, select, orderExprList);
+ convertAgg(
+ bb,
+ select,
+ orderExprList);
} else {
- convertSelectList(bb, select, orderExprList);
+ convertSelectList(
+ bb,
+ select,
+ orderExprList);
}
if (select.isDistinct()) {
distinctify(bb, true);
}
- convertOrder(select, bb, collation, orderExprList, select.getOffset(), select.getFetch());
+ convertOrder(
+ select, bb, collation, orderExprList, select.getOffset(),
+ select.getFetch());
bb.setRoot(bb.root, true);
}
@@ -798,7 +762,9 @@ public class SqlToRelConverter {
* @param bb Blackboard
* @param checkForDupExprs Check for duplicate expressions
*/
- private void distinctify(Blackboard bb, boolean checkForDupExprs) {
+ private void distinctify(
+ Blackboard bb,
+ boolean checkForDupExprs) {
// Look for duplicate expressions in the project.
// Say we have 'select x, y, x, z'.
// Then dups will be {[2, 0]}
@@ -832,7 +798,9 @@ public class SqlToRelConverter {
newProjects.add(RexInputRef.of2(i, fields));
}
}
- rel = LogicalProject.create(rel, Pair.left(newProjects), Pair.right(newProjects));
+ rel =
+ LogicalProject.create(rel, Pair.left(newProjects),
+ Pair.right(newProjects));
bb.root = rel;
distinctify(bb, false);
rel = bb.root;
@@ -843,21 +811,34 @@ public class SqlToRelConverter {
for (int i = 0; i < fields.size(); i++) {
final int origin = origins.get(i);
RelDataTypeField field = fields.get(i);
- undoProjects.add(Pair.of((RexNode) new RexInputRef(squished.get(origin), field.getType()), field.getName()));
+ undoProjects.add(
+ Pair.of(
+ (RexNode) new RexInputRef(
+ squished.get(origin), field.getType()),
+ field.getName()));
}
- rel = LogicalProject.create(rel, Pair.left(undoProjects), Pair.right(undoProjects));
- bb.setRoot(rel, false);
+ rel =
+ LogicalProject.create(rel, Pair.left(undoProjects),
+ Pair.right(undoProjects));
+ bb.setRoot(
+ rel,
+ false);
return;
}
// Usual case: all of the expressions in the SELECT clause are
// different.
- final ImmutableBitSet groupSet = ImmutableBitSet.range(rel.getRowType().getFieldCount());
- rel = createAggregate(bb, false, groupSet, ImmutableList.of(groupSet), ImmutableList.<AggregateCall> of());
-
- bb.setRoot(rel, false);
+ final ImmutableBitSet groupSet =
+ ImmutableBitSet.range(rel.getRowType().getFieldCount());
+ rel =
+ createAggregate(bb, false, groupSet, ImmutableList.of(groupSet),
+ ImmutableList.<AggregateCall>of());
+
+ bb.setRoot(
+ rel,
+ false);
}
private int findExpr(RexNode seek, List<RexNode> exprs, int count) {
@@ -882,16 +863,29 @@ public class SqlToRelConverter {
* returning first row
* @param fetch Expression for number of rows to fetch
*/
- protected void convertOrder(SqlSelect select, Blackboard bb, RelCollation collation, List<SqlNode> orderExprList, SqlNode offset, SqlNode fetch) {
- if (select.getOrderList() == null || select.getOrderList().getList().isEmpty()) {
+ protected void convertOrder(
+ SqlSelect select,
+ Blackboard bb,
+ RelCollation collation,
+ List<SqlNode> orderExprList,
+ SqlNode offset,
+ SqlNode fetch) {
+ if (select.getOrderList() == null
+ || select.getOrderList().getList().isEmpty()) {
assert collation.getFieldCollations().isEmpty();
- if ((offset == null || ((SqlLiteral) offset).bigDecimalValue().equals(BigDecimal.ZERO)) && fetch == null) {
+ if ((offset == null
+ || ((SqlLiteral) offset).bigDecimalValue().equals(BigDecimal.ZERO))
+ && fetch == null) {
return;
}
}
// Create a sorter using the previously constructed collations.
- bb.setRoot(LogicalSort.create(bb.root, collation, offset == null ? null : convertExpression(offset), fetch == null ? null : convertExpression(fetch)), false);
+ bb.setRoot(
+ LogicalSort.create(bb.root, collation,
+ offset == null ? null : convertExpression(offset),
+ fetch == null ? null : convertExpression(fetch)),
+ false);
// If extra expressions were added to the project list for sorting,
// add another project to remove them. But make the collation empty, because
@@ -901,300 +895,492 @@ public class SqlToRelConverter {
if (orderExprList.size() > 0 && !bb.top) {
final List<RexNode> exprs = new ArrayList<>();
final RelDataType rowType = bb.root.getRowType();
- final int fieldCount = rowType.getFieldCount() - orderExprList.size();
+ final int fieldCount =
+ rowType.getFieldCount() - orderExprList.size();
for (int i = 0; i < fieldCount; i++) {
exprs.add(rexBuilder.makeInputRef(bb.root, i));
}
- bb.setRoot(LogicalProject.create(bb.root, exprs, rowType.getFieldNames().subList(0, fieldCount)), false);
+ bb.setRoot(
+ LogicalProject.create(bb.root, exprs,
+ rowType.getFieldNames().subList(0, fieldCount)),
+ false);
}
}
/**
- * Converts a WHERE clause.
+ * Returns whether a given node contains a {@link SqlInOperator}.
*
- * @param bb Blackboard
- * @param where WHERE clause, may be null
+ * @param node a RexNode tree
*/
- private void convertWhere(final Blackboard bb, final SqlNode where) {
- if (where == null) {
- return;
- }
- SqlNode newWhere = pushDownNotForIn(where);
- replaceSubqueries(bb, newWhere, RelOptUtil.Logic.UNKNOWN_AS_FALSE);
- final RexNode convertedWhere = bb.convertExpression(newWhere);
-
- // only allocate filter if the condition is not TRUE
- if (convertedWhere.isAlwaysTrue()) {
- return;
- }
-
- final RelNode filter = RelOptUtil.createFilter(bb.root, convertedWhere);
- final RelNode r;
- final CorrelationUse p = getCorrelationUse(bb, filter);
- if (p != null) {
- assert p.r instanceof Filter;
- Filter f = (Filter) p.r;
- r = LogicalFilter.create(f.getInput(), f.getCondition(), ImmutableSet.of(p.id));
- } else {
- r = filter;
- }
-
- bb.setRoot(r, false);
- }
-
- private void replaceSubqueries(final Blackboard bb, final SqlNode expr, RelOptUtil.Logic logic) {
- findSubqueries(bb, expr, logic, false);
- for (SubQuery node : bb.subqueryList) {
- substituteSubquery(bb, node);
+ private static boolean containsInOperator(
+ SqlNode node) {
+ try {
+ SqlVisitor<Void> visitor =
+ new SqlBasicVisitor<Void>() {
+ public Void visit(SqlCall call) {
+ if (call.getOperator() instanceof SqlInOperator) {
+ throw new Util.FoundOne(call);
+ }
+ return super.visit(call);
+ }
+ };
+ node.accept(visitor);
+ return false;
+ } catch (Util.FoundOne e) {
+ Util.swallow(e, null);
+ return true;
}
}
- private void substituteSubquery(Blackboard bb, SubQuery subQuery) {
- final RexNode expr = subQuery.expr;
- if (expr != null) {
- // Already done.
- return;
- }
-
- final SqlBasicCall call;
- final RelNode rel;
- final SqlNode query;
- final Pair<RelNode, Boolean> converted;
- switch (subQuery.node.getKind()) {
- case CURSOR:
- convertCursor(bb, subQuery);
- return;
-
- case MULTISET_QUERY_CONSTRUCTOR:
- case MULTISET_VALUE_CONSTRUCTOR:
- case ARRAY_QUERY_CONSTRUCTOR:
- rel = convertMultisets(ImmutableList.of(subQuery.node), bb);
- subQuery.expr = bb.register(rel, JoinRelType.INNER);
- return;
-
- case IN:
- call = (SqlBasicCall) subQuery.node;
- query = call.operand(1);
- if (!config.isExpand() && !(query instanceof SqlNodeList)) {
- return;
- }
- final SqlNode leftKeyNode = call.operand(0);
-
- final List<RexNode> leftKeys;
- switch (leftKeyNode.getKind()) {
- case ROW:
- leftKeys = Lists.newArrayList();
- for (SqlNode sqlExpr : ((SqlBasicCall) leftKeyNode).getOperandList()) {
- leftKeys.add(bb.convertExpression(sqlExpr));
- }
- break;
- default:
- leftKeys = ImmutableList.of(bb.convertExpression(leftKeyNode));
- }
-
- final boolean isNotIn = ((SqlInOperator) call.getOperator()).isNotIn();
- if (query instanceof SqlNodeList) {
- SqlNodeList valueList = (SqlNodeList) query;
- if (!containsNullLiteral(valueList) && valueList.size() < getInSubqueryThreshold()) {
- // We're under the threshold, so convert to OR.
- subQuery.expr = convertInToOr(bb, leftKeys, valueList, isNotIn);
- return;
- }
-
- // Otherwise, let convertExists translate
- // values list into an inline table for the
- // reference to Q below.
- }
-
- // Project out the search columns from the left side
-
- // Q1:
- // "select from emp where emp.deptno in (select col1 from T)"
- //
- // is converted to
- //
- // "select from
- // emp inner join (select distinct col1 from T)) q
- // on emp.deptno = q.col1
- //
- // Q2:
- // "select from emp where emp.deptno not in (Q)"
- //
- // is converted to
- //
- // "select from
- // emp left outer join (select distinct col1, TRUE from T) q
- // on emp.deptno = q.col1
- // where emp.deptno <> null
- // and q.indicator <> TRUE"
- //
- final boolean outerJoin = bb.subqueryNeedsOuterJoin || isNotIn || subQuery.logic == RelOptUtil.Logic.TRUE_FALSE_UNKNOWN;
- final RelDataType targetRowType = SqlTypeUtil.promoteToRowType(typeFactory, validator.getValidatedNodeType(leftKeyNode), null);
- converted = convertExists(query, RelOptUtil.SubqueryType.IN, subQuery.logic, outerJoin, targetRowType);
- if (converted.right) {
- // Generate
- // emp CROSS JOIN (SELECT COUNT(*) AS c,
- // COUNT(deptno) AS ck FROM dept)
- final RelDataType longType = typeFactory.createSqlType(SqlTypeName.BIGINT);
- final RelNode seek = converted.left.getInput(0); // fragile
- final int keyCount = leftKeys.size();
- final List<Integer> args = ImmutableIntList.range(0, keyCount);
- LogicalAggregate aggregate = LogicalAggregate.create(seek, false, ImmutableBitSet.of(), null, ImmutableList.of(AggregateCall.create(SqlStdOperatorTable.COUNT, false, ImmutableList.<Integer> of(), -1, longType, null), AggregateCall.create(SqlStdOperatorTable.COUNT, false, args, -1, longType, null)));
- LogicalJoin join = LogicalJoin.create(bb.root, aggregate, rexBuilder.makeLiteral(true), ImmutableSet.<CorrelationId> of(), JoinRelType.INNER);
- bb.setRoot(join, false);
- }
- RexNode rex = bb.register(converted.left, outerJoin ? JoinRelType.LEFT : JoinRelType.INNER, leftKeys);
-
- subQuery.expr = translateIn(subQuery, bb.root, rex);
- if (isNotIn) {
- subQuery.expr = rexBuilder.makeCall(SqlStdOperatorTable.NOT, subQuery.expr);
+ /**
+ * Push down all the NOT logical operators into any IN/NOT IN operators.
+ *
+ * @param sqlNode the root node from which to look for NOT operators
+ * @return the transformed SqlNode representation with NOT pushed down.
+ */
+ private static SqlNode pushDownNotForIn(SqlNode sqlNode) {
+ if ((sqlNode instanceof SqlCall) && containsInOperator(sqlNode)) {
+ SqlCall sqlCall = (SqlCall) sqlNode;
+ if ((sqlCall.getOperator() == SqlStdOperatorTable.AND)
+ || (sqlCall.getOperator() == SqlStdOperatorTable.OR)) {
+ SqlNode[] sqlOperands = ((SqlBasicCall) sqlCall).operands;
+ for (int i = 0; i < sqlOperands.length; i++) {
+ sqlOperands[i] = pushDownNotForIn(sqlOperands[i]);
+ }
+ return sqlNode;
+ } else if (sqlCall.getOperator() == SqlStdOperatorTable.NOT) {
+ SqlNode childNode = sqlCall.operand(0);
+ assert childNode instanceof SqlCall;
+ SqlBasicCall childSqlCall = (SqlBasicCall) childNode;
+ if (childSqlCall.getOperator() == SqlStdOperatorTable.AND) {
+ SqlNode[] andOperands = childSqlCall.getOperands();
+ SqlNode[] orOperands = new SqlNode[andOperands.length];
+ for (int i = 0; i < orOperands.length; i++) {
+ orOperands[i] =
+ SqlStdOperatorTable.NOT.createCall(
+ SqlParserPos.ZERO,
+ andOperands[i]);
+ }
+ for (int i = 0; i < orOperands.length; i++) {
+ orOperands[i] = pushDownNotForIn(orOperands[i]);
+ }
+ return SqlStdOperatorTable.OR.createCall(SqlParserPos.ZERO,
+ orOperands[0], orOperands[1]);
+ } else if (childSqlCall.getOperator() == SqlStdOperatorTable.OR) {
+ SqlNode[] orOperands = childSqlCall.getOperands();
+ SqlNode[] andOperands = new SqlNode[orOperands.length];
+ for (int i = 0; i < andOperands.length; i++) {
+ andOperands[i] =
+ SqlStdOperatorTable.NOT.createCall(
+ SqlParserPos.ZERO,
+ orOperands[i]);
+ }
+ for (int i = 0; i < andOperands.length; i++) {
+ andOperands[i] = pushDownNotForIn(andOperands[i]);
+ }
+ return SqlStdOperatorTable.AND.createCall(SqlParserPos.ZERO,
+ andOperands[0], andOperands[1]);
+ } else if (childSqlCall.getOperator() == SqlStdOperatorTable.NOT) {
+ SqlNode[] notOperands = childSqlCall.getOperands();
+ assert notOperands.length == 1;
+ return pushDownNotForIn(notOperands[0]);
+ } else if (childSqlCall.getOperator() instanceof SqlInOperator) {
+ SqlNode[] inOperands = childSqlCall.getOperands();
+ SqlInOperator inOp =
+ (SqlInOperator) childSqlCall.getOperator();
+ if (inOp.isNotIn()) {
+ return SqlStdOperatorTable.IN.createCall(
+ SqlParserPos.ZERO,
+ inOperands[0],
+ inOperands[1]);
+ } else {
+ return SqlStdOperatorTable.NOT_IN.createCall(
+ SqlParserPos.ZERO,
+ inOperands[0],
+ inOperands[1]);
+ }
+ } else {
+ // childSqlCall is "leaf" node in a logical expression tree
+ // (only considering AND, OR, NOT)
+ return sqlNode;
+ }
+ } else {
+ // sqlNode is "leaf" node in a logical expression tree
+ // (only considering AND, OR, NOT)
+ return sqlNode;
}
+ } else {
+ // tree rooted at sqlNode does not contain inOperator
+ return sqlNode;
+ }
+ }
+
+ /**
+ * Converts a WHERE clause.
+ *
+ * @param bb Blackboard
+ * @param where WHERE clause, may be null
+ */
+ private void convertWhere(
+ final Blackboard bb,
+ final SqlNode where) {
+ if (where == null) {
return;
+ }
+ SqlNode newWhere = pushDownNotForIn(where);
+ replaceSubQueries(bb, newWhere, RelOptUtil.Logic.UNKNOWN_AS_FALSE);
+ final RexNode convertedWhere = bb.convertExpression(newWhere);
- case EXISTS:
- // "select from emp where exists (select a from T)"
- //
- // is converted to the following if the subquery is correlated:
- //
- // "select from emp left outer join (select AGG_TRUE() as indicator
- // from T group by corr_var) q where q.indicator is true"
- //
- // If there is no correlation, the expression is replaced with a
- // boolean indicating whether the subquery returned 0 or >= 1 row.
- call = (SqlBasicCall) subQuery.node;
- query = call.operand(0);
- if (!config.isExpand()) {
+ // only allocate filter if the condition is not TRUE
+ if (convertedWhere.isAlwaysTrue()) {
+ return;
+ }
+
+ final RelFactories.FilterFactory factory =
+ RelFactories.DEFAULT_FILTER_FACTORY;
+ final RelNode filter = factory.createFilter(bb.root, convertedWhere);
+ final RelNode r;
+ final CorrelationUse p = getCorrelationUse(bb, filter);
+ if (p != null) {
+ assert p.r instanceof Filter;
+ Filter f = (Filter) p.r;
+ r = LogicalFilter.create(f.getInput(), f.getCondition(),
+ ImmutableSet.of(p.id));
+ } else {
+ r = filter;
+ }
+
+ bb.setRoot(r, false);
+ }
+
+ private void replaceSubQueries(
+ final Blackboard bb,
+ final SqlNode expr,
+ RelOptUtil.Logic logic) {
+ findSubQueries(bb, expr, logic, false);
+ for (SubQuery node : bb.subQueryList) {
+ substituteSubQuery(bb, node);
+ }
+ }
+
+ private void substituteSubQuery(Blackboard bb, SubQuery subQuery) {
+ final RexNode expr = subQuery.expr;
+ if (expr != null) {
+ // Already done.
+ return;
+ }
+
+ final SqlBasicCall call;
+ final RelNode rel;
+ final SqlNode query;
+ final RelOptUtil.Exists converted;
+ switch (subQuery.node.getKind()) {
+ case CURSOR:
+ convertCursor(bb, subQuery);
return;
- }
- converted = convertExists(query, RelOptUtil.SubqueryType.EXISTS, subQuery.logic, true, null);
- assert !converted.right;
- if (convertNonCorrelatedSubQuery(subQuery, bb, converted.left, true)) {
+
+ case MULTISET_QUERY_CONSTRUCTOR:
+ case MULTISET_VALUE_CONSTRUCTOR:
+ case ARRAY_QUERY_CONSTRUCTOR:
+ rel = convertMultisets(ImmutableList.of(subQuery.node), bb);
+ subQuery.expr = bb.register(rel, JoinRelType.INNER);
return;
- }
- subQuery.expr = bb.register(converted.left, JoinRelType.LEFT);
- return;
- case SCALAR_QUERY:
- // Convert the subquery. If it's non-correlated, convert it
- // to a constant expression.
- if (!config.isExpand()) {
+ case IN:
+ call = (SqlBasicCall) subQuery.node;
+ query = call.operand(1);
+ if (!config.isExpand() && !(query instanceof SqlNodeList)) {
+ return;
+ }
+ final SqlNode leftKeyNode = call.operand(0);
+
+ final List<RexNode> leftKeys;
+ switch (leftKeyNode.getKind()) {
+ case ROW:
+ leftKeys = Lists.newArrayList();
+ for (SqlNode sqlExpr : ((SqlBasicCall) leftKeyNode).getOperandList()) {
+ leftKeys.add(bb.convertExpression(sqlExpr));
+ }
+ break;
+ default:
+ leftKeys = ImmutableList.of(bb.convertExpression(leftKeyNode));
+ }
+
+ final boolean notIn = ((SqlInOperator) call.getOperator()).isNotIn();
+ if (query instanceof SqlNodeList) {
+ SqlNodeList valueList = (SqlNodeList) query;
+ if (!containsNullLiteral(valueList)
+ && valueList.size() < config.getInSubQueryThreshold()) {
+ // We're under the threshold, so convert to OR.
+ subQuery.expr =
+ convertInToOr(
+ bb,
+ leftKeys,
+ valueList,
+ notIn);
+ return;
+ }
+
+ // Otherwise, let convertExists translate
+ // values list into an inline table for the
+ // reference to Q below.
+ }
+
+ // Project out the search columns from the left side
+
+ // Q1:
+ // "select from emp where emp.deptno in (select col1 from T)"
+ //
+ // is converted to
+ //
+ // "select from
+ // emp inner join (select distinct col1 from T)) q
+ // on emp.deptno = q.col1
+ //
+ // Q2:
+ // "select from emp where emp.deptno not in (Q)"
+ //
+ // is converted to
+ //
+ // "select from
+ // emp left outer join (select distinct col1, TRUE from T) q
+ // on emp.deptno = q.col1
+ // where emp.deptno <> null
+ // and q.indicator <> TRUE"
+ //
+ final RelDataType targetRowType =
+ SqlTypeUtil.promoteToRowType(typeFactory,
+ validator.getValidatedNodeType(leftKeyNode), null);
+ converted =
+ convertExists(query, RelOptUtil.SubQueryType.IN, subQuery.logic,
+ notIn, targetRowType);
+ if (converted.indicator) {
+ // Generate
+ // emp CROSS JOIN (SELECT COUNT(*) AS c,
+ // COUNT(deptno) AS ck FROM dept)
+ final RelDataType longType =
+ typeFactory.createSqlType(SqlTypeName.BIGINT);
+ final RelNode seek = converted.r.getInput(0); // fragile
+ final int keyCount = leftKeys.size();
+ final List<Integer> args = ImmutableIntList.range(0, keyCount);
+ LogicalAggregate aggregate =
+ LogicalAggregate.create(seek, false, ImmutableBitSet.of(), null,
+ ImmutableList.of(
+ AggregateCall.create(SqlStdOperatorTable.COUNT, false,
+ ImmutableList.<Integer>of(), -1, longType, null),
+ AggregateCall.create(SqlStdOperatorTable.COUNT, false,
+ args, -1, longType, null)));
+ LogicalJoin join =
+ LogicalJoin.create(bb.root, aggregate, rexBuilder.makeLiteral(true),
+ ImmutableSet.<CorrelationId>of(), JoinRelType.INNER);
+ bb.setRoot(join, false);
+ }
+ final RexNode rex =
+ bb.register(converted.r,
+ converted.outerJoin ? JoinRelType.LEFT : JoinRelType.INNER,
+ leftKeys);
+
+ RelOptUtil.Logic logic = subQuery.logic;
+ switch (logic) {
+ case TRUE_FALSE_UNKNOWN:
+ case UNKNOWN_AS_TRUE:
+ if (!converted.indicator) {
+ logic = RelOptUtil.Logic.TRUE_FALSE;
+ }
+ }
+ subQuery.expr = translateIn(logic, bb.root, rex);
+ if (notIn) {
+ subQuery.expr =
+ rexBuilder.makeCall(SqlStdOperatorTable.NOT, subQuery.expr);
+ }
return;
- }
- call = (SqlBasicCall) subQuery.node;
- query = call.operand(0);
- converted = convertExists(query, RelOptUtil.SubqueryType.SCALAR, subQuery.logic, true, null);
- assert !converted.right;
- if (convertNonCorrelatedSubQuery(subQuery, bb, converted.left, false)) {
+
+ case EXISTS:
+ // "select from emp where exists (select a from T)"
+ //
+ // is converted to the following if the sub-query is correlated:
+ //
+ // "select from emp left outer join (select AGG_TRUE() as indicator
+ // from T group by corr_var) q where q.indicator is true"
+ //
+ // If there is no correlation, the expression is replaced with a
+ // boolean indicating whether the sub-query returned 0 or >= 1 row.
+ call = (SqlBasicCall) subQuery.node;
+ query = call.operand(0);
+ if (!config.isExpand()) {
+ return;
+ }
+ converted = convertExists(query, RelOptUtil.SubQueryType.EXISTS,
+ subQuery.logic, true, null);
+ assert !converted.indicator;
+ if (convertNonCorrelatedSubQuery(subQuery, bb, converted.r, true)) {
+ return;
+ }
+ subQuery.expr = bb.register(converted.r, JoinRelType.LEFT);
return;
- }
- rel = convertToSingleValueSubq(query, converted.left);
- subQuery.expr = bb.register(rel, JoinRelType.LEFT);
- return;
- case SELECT:
- // This is used when converting multiset queries:
- //
- // select * from unnest(select multiset[deptno] from emps);
- //
- converted = convertExists(subQuery.node, RelOptUtil.SubqueryType.SCALAR, subQuery.logic, true, null);
- assert !converted.right;
- subQuery.expr = bb.register(converted.left, JoinRelType.LEFT);
- return;
+ case SCALAR_QUERY:
+ // Convert the sub-query. If it's non-correlated, convert it
+ // to a constant expression.
+ if (!config.isExpand()) {
+ return;
+ }
+ call = (SqlBasicCall) subQuery.node;
+ query = call.operand(0);
+ converted = convertExists(query, RelOptUtil.SubQueryType.SCALAR,
+ subQuery.logic, true, null);
+ assert !converted.indicator;
+ if (convertNonCorrelatedSubQuery(subQuery, bb, converted.r, false)) {
+ return;
+ }
+ rel = convertToSingleValueSubq(query, converted.r);
+ subQuery.expr = bb.register(rel, JoinRelType.LEFT);
+ return;
+
+ case SELECT:
+ // This is used when converting multiset queries:
+ //
+ // select * from unnest(select multiset[deptno] from emps);
+ //
+ converted = convertExists(subQuery.node, RelOptUtil.SubQueryType.SCALAR,
+ subQuery.logic, true, null);
+ assert !converted.indicator;
+ subQuery.expr = bb.register(converted.r, JoinRelType.LEFT);
+ return;
- default:
- throw Util.newInternal("unexpected kind of subquery :" + subQuery.node);
+ default:
+ throw Util.newInternal("unexpected kind of sub-query :" + subQuery.node);
}
}
- private RexNode translateIn(SubQuery subQuery, RelNode root, final RexNode rex) {
- switch (subQuery.logic) {
- case TRUE:
- return rexBuilder.makeLiteral(true);
+ private RexNode translateIn(RelOptUtil.Logic logic, RelNode root,
+ final RexNode rex) {
+ switch (logic) {
+ case TRUE:
+ return rexBuilder.makeLiteral(true);
+
+ case TRUE_FALSE:
+ case UNKNOWN_AS_FALSE:
+ assert rex instanceof RexRangeRef;
+ final int fieldCount = rex.getType().getFieldCount();
+ RexNode rexNode = rexBuilder.makeFieldAccess(rex, fieldCount - 1);
+ rexNode = rexBuilder.makeCall(SqlStdOperatorTable.IS_TRUE, rexNode);
+
+ // Then append the IS NOT NULL(leftKeysForIn).
+ //
+ // RexRangeRef contains the following fields:
+ // leftKeysForIn,
+ // rightKeysForIn (the original sub-query select list),
+ // nullIndicator
+ //
+ // The first two lists contain the same number of fields.
+ final int k = (fieldCount - 1) / 2;
+ for (int i = 0; i < k; i++) {
+ rexNode =
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.AND,
+ rexNode,
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.IS_NOT_NULL,
+ rexBuilder.makeFieldAccess(rex, i)));
+ }
+ return rexNode;
+
+ case TRUE_FALSE_UNKNOWN:
+ case UNKNOWN_AS_TRUE:
+ // select e.deptno,
+ // case
+ // when ct.c = 0 then false
+ // when dt.i is not null then true
+ // when e.deptno is null then null
+ // when ct.ck < ct.c then null
+ // else false
+ // end
+ // from e
+ // cross join (select count(*) as c, count(deptno) as ck from v) as ct
+ // left join (select distinct deptno, true as i from v) as dt
+ // on e.deptno = dt.deptno
+ final Join join = (Join) root;
+ final Project left = (Project) join.getLeft();
+ final RelNode leftLeft = ((Join) left.getInput()).getLeft();
+ final int leftLeftCount = leftLeft.getRowType().getFieldCount();
+ final RelDataType longType =
+ typeFactory.createSqlType(SqlTypeName.BIGINT);
+ final RexNode cRef = rexBuilder.makeInputRef(root, leftLeftCount);
+ final RexNode ckRef = rexBuilder.makeInputRef(root, leftLeftCount + 1);
+ final RexNode iRef =
+ rexBuilder.makeInputRef(root, root.getRowType().getFieldCount() - 1);
+
+ final RexLiteral zero =
+ rexBuilder.makeExactLiteral(BigDecimal.ZERO, longType);
+ final RexLiteral trueLiteral = rexBuilder.makeLiteral(true);
+ final RexLiteral falseLiteral = rexBuilder.makeLiteral(false);
+ final RexNode unknownLiteral =
+ rexBuilder.makeNullLiteral(SqlTypeName.BOOLEAN);
+
+ final ImmutableList.Builder<RexNode> args = ImmutableList.builder();
+ args.add(rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, cRef, zero),
+ falseLiteral,
+ rexBuilder.makeCall(SqlStdOperatorTable.IS_NOT_NULL, iRef),
+ trueLiteral);
+ final JoinInfo joinInfo = join.analyzeCondition();
+ for (int leftKey : joinInfo.leftKeys) {
+ final RexNode kRef = rexBuilder.makeInputRef(root, leftKey);
+ args.add(rexBuilder.makeCall(SqlStdOperatorTable.IS_NULL, kRef),
+ unknownLiteral);
+ }
+ args.add(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, ckRef, cRef),
+ unknownLiteral,
+ falseLiteral);
- case UNKNOWN_AS_FALSE:
- assert rex instanceof RexRangeRef;
- final int fieldCount = rex.getType().getFieldCount();
- RexNode rexNode = rexBuilder.makeFieldAccess(rex, fieldCount - 1);
- rexNode = rexBuilder.makeCall(SqlStdOperatorTable.IS_TRUE, rexNode);
-
- // Then append the IS NOT NULL(leftKeysForIn).
- //
- // RexRangeRef contains the following fields:
- // leftKeysForIn,
- // rightKeysForIn (the original subquery select list),
- // nullIndicator
- //
- // The first two lists contain the same number of fields.
- final int k = (fieldCount - 1) / 2;
- for (int i = 0; i < k; i++) {
- rexNode = rexBuilder.makeCall(SqlStdOperatorTable.AND, rexNode, rexBuilder.makeCall(SqlStdOperatorTable.IS_NOT_NULL, rexBuilder.makeFieldAccess(rex, i)));
- }
- return rexNode;
-
- case TRUE_FALSE_UNKNOWN:
- case UNKNOWN_AS_TRUE:
- // select e.deptno,
- // case
- // when ct.c = 0 then false
- // when dt.i is not null then true
- // when e.deptno is null then null
- // when ct.ck < ct.c then null
- // else false
- // end
- // from e
- // cross join (select count(*) as c, count(deptno) as ck from v) as ct
- // left join (select distinct deptno, true as i from v) as dt
- // on e.deptno = dt.deptno
- final Join join = (Join) root;
- final Project left = (Project) join.getLeft();
- final RelNode leftLeft = ((Join) left.getInput()).getLeft();
- final int leftLeftCount = leftLeft.getRowType().getFieldCount();
- final RelDataType nullableBooleanType = typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.BOOLEAN), true);
- final RelDataType longType = typeFactory.createSqlType(SqlTypeName.BIGINT);
- final RexNode cRef = rexBuilder.makeInputRef(root, leftLeftCount);
- final RexNode ckRef = rexBuilder.makeInputRef(root, leftLeftCount + 1);
- final RexNode iRef = rexBuilder.makeInputRef(root, root.getRowType().getFieldCount() - 1);
-
- final RexLiteral zero = rexBuilder.makeExactLiteral(BigDecimal.ZERO, longType);
- final RexLiteral trueLiteral = rexBuilder.makeLiteral(true);
- final RexLiteral falseLiteral = rexBuilder.makeLiteral(false);
- final RexNode unknownLiteral = rexBuilder.makeNullLiteral(SqlTypeName.BOOLEAN);
-
- final ImmutableList.Builder<RexNode> args = ImmutableList.builder();
- args.add(rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, cRef, zero), falseLiteral, rexBuilder.makeCall(SqlStdOperatorTable.IS_NOT_NULL, iRef), trueLiteral);
- final JoinInfo joinInfo = join.analyzeCondition();
- for (int leftKey : joinInfo.leftKeys) {
- final RexNode kRef = rexBuilder.makeInputRef(root, leftKey);
- args.add(rexBuilder.makeCall(SqlStdOperatorTable.IS_NULL, kRef), unknownLiteral);
- }
- args.add(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, ckRef, cRef), unknownLiteral, falseLiteral);
+ return rexBuilder.makeCall(SqlStdOperatorTable.CASE, args.build());
- return rexBuilder.makeCall(nullableBooleanType, SqlStdOperatorTable.CASE, args.build());
+ default:
+ throw new AssertionError(logic);
+ }
+ }
- default:
- throw new AssertionError(subQuery.logic);
+ private static boolean containsNullLiteral(SqlNodeList valueList) {
+ for (SqlNode node : valueList.getList()) {
+ if (node instanceof SqlLiteral) {
+ SqlLiteral lit = (SqlLiteral) node;
+ if (lit.getValue() == null) {
+ return true;
+ }
+ }
}
+ return false;
}
/**
- * Determines if a subquery is non-correlated and if so, converts it to a
+ * Determines if a sub-query is non-correlated and if so, converts it to a
* constant.
*
- * @param subQuery the call that references the subquery
- * @param bb blackboard used to convert the subquery
- * @param converted RelNode tree corresponding to the subquery
- * @param isExists true if the subquery is part of an EXISTS expression
- * @return if the subquery can be converted to a constant
+ * @param subQuery the call that references the sub-query
+ * @param bb blackboard used to convert the sub-query
+ * @param converted RelNode tree corresponding to the sub-query
+ * @param isExists true if the sub-query is part of an EXISTS expression
+ * @return Whether the sub-query can be converted to a constant
*/
- private boolean convertNonCorrelatedSubQuery(SubQuery subQuery, Blackboard bb, RelNode converted, boolean isExists) {
+ private boolean convertNonCorrelatedSubQuery(
+ SubQuery subQuery,
+ Blackboard bb,
+ RelNode converted,
+ boolean isExists) {
SqlCall call = (SqlBasicCall) subQuery.node;
- if (subqueryConverter.canConvertSubquery() && isSubQueryNonCorrelated(converted, bb)) {
- // First check if the subquery has already been converted
- // because it's a nested subquery. If so, don't re-evaluate
+ if (subQueryConverter.canConvertSubQuery()
+ && isSubQueryNonCorrelated(converted, bb)) {
+ // First check if the sub-query has already been converted
+ // because it's a nested sub-query. If so, don't re-evaluate
// it again.
RexNode constExpr = mapConvertedNonCorrSubqs.get(call);
if (constExpr == null) {
- constExpr = subqueryConverter.convertSubquery(call, this, isExists, config.isExplain());
+ constExpr =
+ subQueryConverter.convertSubQuery(
+ call,
+ this,
+ isExists,
+ config.isExplain());
}
if (constExpr != null) {
subQuery.expr = constExpr;
@@ -1213,14 +1399,17 @@ public class SqlToRelConverter {
* @param plan the original RelNode tree corresponding to the statement
* @return the converted RelNode tree
*/
- public RelNode convertToSingleValueSubq(SqlNode query, RelNode plan) {
+ public RelNode convertToSingleValueSubq(
+ SqlNode query,
+ RelNode plan) {
// Check whether query is guaranteed to produce a single value.
if (query instanceof SqlSelect) {
SqlSelect select = (SqlSelect) query;
SqlNodeList selectList = select.getSelectList();
SqlNodeList groupList = select.getGroup();
- if ((selectList.size() == 1) && ((groupList == null) || (groupList.size() == 0))) {
+ if ((selectList.size() == 1)
+ && ((groupList == null) || (groupList.size() == 0))) {
SqlNode selectExpr = selectList.get(0);
if (selectExpr instanceof SqlCall) {
SqlCall selectExprCall = (SqlCall) selectExpr;
@@ -1231,7 +1420,8 @@ public class SqlToRelConverter {
// If there is a limit with 0 or 1,
// it is ensured to produce a single value
- if (select.getFetch() != null && select.getFetch() instanceof SqlNumericLiteral) {
+ if (select.getFetch() != null
+ && select.getFetch() instanceof SqlNumericLiteral) {
SqlNumericLiteral limitNum = (SqlNumericLiteral) select.getFetch();
if (((BigDecimal) limitNum.getValue()).intValue() < 2) {
return plan;
@@ -1243,13 +1433,17 @@ public class SqlToRelConverter {
// it is necessary to look into the operands to determine
// whether SingleValueAgg is necessary
SqlCall exprCall = (SqlCall) query;
- if (exprCall.getOperator() instanceof SqlValuesOperator && Util.isSingleValue(exprCall)) {
+ if (exprCall.getOperator()
+ instanceof SqlValuesOperator
+ && Util.isSingleValue(exprCall)) {
return plan;
}
}
// If not, project SingleValueAgg
- return RelOptUtil.createSingleValueAggRel(cluster, plan);
+ return RelOptUtil.createSingleValueAggRel(
+ cluster,
+ plan);
}
/**
@@ -1260,30 +1454,52 @@ public class SqlToRelConverter {
* @param isNotIn is this a NOT IN operator
* @return converted expression
*/
- private RexNode convertInToOr(final Blackboard bb, final List<RexNode> leftKeys, SqlNodeList valuesList, boolean isNotIn) {
+ private RexNode convertInToOr(
+ final Blackboard bb,
+ final List<RexNode> leftKeys,
+ SqlNodeList valuesList,
+ boolean isNotIn) {
final List<RexNode> comparisons = new ArrayList<>();
for (SqlNode rightVals : valuesList) {
RexNode rexComparison;
if (leftKeys.size() == 1) {
- rexComparison = rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, leftKeys.get(0), ensureSqlType(leftKeys.get(0).getType(), bb.convertExpression(rightVals)));
+ rexComparison =
+ rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
+ leftKeys.get(0),
+ ensureSqlType(leftKeys.get(0).getType(),
+ bb.convertExpression(rightVals)));
} else {
assert rightVals instanceof SqlCall;
final SqlBasicCall call = (SqlBasicCall) rightVals;
- assert (call.getOperator() instanceof SqlRowOperator) && call.operandCount() == leftKeys.size();
- rexComparison = RexUtil.composeConjunction(rexBuilder, Iterables.transform(Pair.zip(leftKeys, call.getOperandList()), new Function<Pair<RexNode, SqlNode>, RexNode>() {
- public RexNode apply(Pair<RexNode, SqlNode> pair) {
- return rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, pair.left, ensureSqlType(pair.left.getType(), bb.convertExpression(pair.right)));
- }
- }), false);
+ assert (call.getOperator() instanceof SqlRowOperator)
+ && call.operandCount() == leftKeys.size();
+ rexComparison =
+ RexUtil.composeConjunction(
+ rexBuilder,
+ Iterables.transform(
+ Pair.zip(leftKeys, call.getOperandList()),
+ new Function<Pair<RexNode, SqlNode>, RexNode>() {
+ public RexNode apply(Pair<RexNode, SqlNode> pair) {
+ return rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
+ pair.left,
+ ensureSqlType(pair.left.getType(),
+ bb.convertExpression(pair.right)));
+ }
+ }),
+ false);
}
comparisons.add(rexComparison);
}
- RexNode result = RexUtil.composeDisjunction(rexBuilder, comparisons, true);
+ RexNode result =
+ RexUtil.composeDisjunction(rexBuilder, comparisons, true);
assert result != null;
if (isNotIn) {
- result = rexBuilder.makeCall(SqlStdOperatorTable.NOT, result);
+ result =
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.NOT,
+ result);
}
return result;
@@ -1293,7 +1509,9 @@ public class SqlToRelConverter {
* cast if necessary. If the expression already has the right type family,
* returns the expression unchanged. */
private RexNode ensureSqlType(RelDataType type, RexNode node) {
- if (type.getSqlTypeName() == node.getType().getSqlTypeName() || (type.getSqlTypeName() == SqlTypeName.VARCHAR && node.getType().getSqlTypeName() == SqlTypeName.CHAR)) {
+ if (type.getSqlTypeName() == node.getType().getSqlTypeName()
+ || (type.getSqlTypeName() == SqlTypeName.VARCHAR
+ && node.getType().getSqlTypeName() == SqlTypeName.CHAR)) {
return node;
}
return rexBuilder.ensureType(type, node, true);
@@ -1307,17 +1525,17 @@ public class SqlToRelConverter {
* predicate. A threshold of 0 forces usage of an inline table in all cases; a
* threshold of Integer.MAX_VALUE forces usage of OR in all cases
*
- * @return threshold, default {@link #DEFAULT_IN_SUBQUERY_THRESHOLD}
+ * @return threshold, default {@link #DEFAULT_IN_SUB_QUERY_THRESHOLD}
*/
@Deprecated // to be removed before 2.0
protected int getInSubqueryThreshold() {
- //return config.getInSubqueryThreshold();
- /* OVERRIDE POINT */
+ //return config.getInSubQueryThreshold();
+ /* OVERRIDE POINT */
return Integer.MAX_VALUE;
}
/**
- * Converts an EXISTS or IN predicate into a join. For EXISTS, the subquery
+ * Converts an EXISTS or IN predicate into a join. For EXISTS, the sub-query
* produces an indicator variable, and the result is a relational expression
* which outer joins that indicator to the original query. After performing
* the outer join, the condition will be TRUE if the EXISTS condition holds,
@@ -1325,23 +1543,34 @@ public class SqlToRelConverter {
*
* @param seek A query, for example 'select * from emp' or
* 'values (1,2,3)' or '('Foo', 34)'.
- * @param subqueryType Whether sub-query is IN, EXISTS or scalar
+ * @param subQueryType Whether sub-query is IN, EXISTS or scalar
* @param logic Whether the answer needs to be in full 3-valued logic (TRUE,
* FALSE, UNKNOWN) will be required, or whether we can accept an
* approximation (say representing UNKNOWN as FALSE)
- * @param needsOuterJoin Whether an outer join is needed
+ * @param notIn Whether the operation is NOT IN
* @return join expression
* @pre extraExpr == null || extraName != null
*/
- private Pair<RelNode, Boolean> convertExists(SqlNode seek, RelOptUtil.SubqueryType subqueryType, RelOptUtil.Logic logic, boolean needsOuterJoin, RelDataType targetDataType) {
- final SqlValidatorScope seekScope = (seek instanceof SqlSelect) ? validator.getSelectScope((SqlSelect) seek) : null;
+ private RelOptUtil.Exists convertExists(
+ SqlNode seek,
+ RelOptUtil.SubQueryType subQueryType,
+ RelOptUtil.Logic logic,
+ boolean notIn,
+ RelDataType targetDataType) {
+ final SqlValidatorScope seekScope =
+ (seek instanceof SqlSelect)
+ ? validator.getSelectScope((SqlSelect) seek)
+ : null;
final Blackboard seekBb = createBlackboard(seekScope, null, false);
RelNode seekRel = convertQueryOrInList(seekBb, seek, targetDataType);
- return RelOptUtil.createExistsPlan(seekRel, subqueryType, logic, needsOuterJoin);
+ return RelOptUtil.createExistsPlan(seekRel, subQueryType, logic, notIn);
}
- private RelNode convertQueryOrInList(Blackboard bb, SqlNode seek, RelDataType targetRowType) {
+ private RelNode convertQueryOrInList(
+ Blackboard bb,
+ SqlNode seek,
+ RelDataType targetRowType) {
// NOTE: Once we start accepting single-row queries as row constructors,
// there will be an ambiguity here for a case like X IN ((SELECT Y FROM
// Z)). The SQL standard resolves the ambiguity by saying that a lone
@@ -1349,25 +1578,40 @@ public class SqlToRelConverter {
// expression. The semantic difference is that a table expression can
// return multiple rows.
if (seek instanceof SqlNodeList) {
- return convertRowValues(bb, seek, ((SqlNodeList) seek).getList(), false, targetRowType);
+ return convertRowValues(
+ bb,
+ seek,
+ ((SqlNodeList) seek).getList(),
+ false,
+ targetRowType);
} else {
return convertQueryRecursive(seek, false, null).project();
}
}
- private RelNode convertRowValues(Blackboard bb, SqlNode rowList, Collection<SqlNode> rows, boolean allowLiteralsOnly, RelDataType targetRowType) {
+ private RelNode convertRowValues(
+ Blackboard bb,
+ SqlNode rowList,
+ Collection<SqlNode> rows,
+ boolean allowLiteralsOnly,
+ RelDataType targetRowType) {
// NOTE jvs 30-Apr-2006: We combine all rows consisting entirely of
// literals into a single LogicalValues; this gives the optimizer a smaller
// input tree. For everything else (computed expressions, row
- // subqueries), we union each row in as a projection on top of a
+ // sub-queries), we union each row in as a projection on top of a
// LogicalOneRow.
- final ImmutableList.Builder<ImmutableList<RexLiteral>> tupleList = ImmutableList.builder();
+ final ImmutableList.Builder<ImmutableList<RexLiteral>> tupleList =
+ ImmutableList.builder();
final RelDataType rowType;
if (targetRowType != null) {
rowType = targetRowType;
} else {
- rowType = SqlTypeUtil.promoteToRowType(typeFactory, validator.getValidatedNodeType(rowList), null);
+ rowType =
+ SqlTypeUtil.promoteToRowType(
+ typeFactory,
+ validator.getValidatedNodeType(rowList),
+ null);
}
final List<RelNode> unionInputs = new ArrayList<>();
@@ -1377,7 +1621,12 @@ public class SqlToRelConverter {
call = (SqlBasicCall) node;
ImmutableList.Builder<RexLiteral> tuple = ImmutableList.builder();
for (Ord<SqlNode> operand : Ord.zip(call.operands)) {
- RexLiteral rexLiteral = convertLiteralInValuesList(operand.e, bb, rowType, operand.i);
+ RexLiteral rexLiteral =
+ convertLiteralInValuesList(
+ operand.e,
+ bb,
+ rowType,
+ operand.i);
if ((rexLiteral == null) && allowLiteralsOnly) {
return null;
}
@@ -1393,7 +1642,12 @@ public class SqlToRelConverter {
continue;
}
} else {
- RexLiteral rexLiteral = convertLiteralInValuesList(node, bb, rowType, 0);
+ RexLiteral rexLiteral =
+ convertLiteralInValuesList(
+ node,
+ bb,
+ rowType,
+ 0);
if ((rexLiteral != null) && config.isCreateValuesRel()) {
tupleList.add(ImmutableList.of(rexLiteral));
continue;
@@ -1404,11 +1658,15 @@ public class SqlToRelConverter {
}
// convert "1" to "row(1)"
- call = (SqlBasicCall) SqlStdOperatorTable.ROW.createCall(SqlParserPos.ZERO, node);
+ call =
+ (SqlBasicCall) SqlStdOperatorTable.ROW.createCall(
+ SqlParserPos.ZERO,
+ node);
}
unionInputs.add(convertRowConstructor(bb, call));
}
- LogicalValues values = LogicalValues.create(cluster, rowType, tupleList.build());
+ LogicalValues values =
+ LogicalValues.create(cluster, rowType, tupleList.build());
RelNode resultRel;
if (unionInputs.isEmpty()) {
resultRel = values;
@@ -1422,7 +1680,11 @@ public class SqlToRelConverter {
return resultRel;
}
- private RexLiteral convertLiteralInValuesList(SqlNode sqlNode, Blackboard bb, RelDataType rowType, int iField) {
+ private RexLiteral convertLiteralInValuesList(
+ SqlNode sqlNode,
+ Blackboard bb,
+ RelDataType rowType,
+ int iField) {
if (!(sqlNode instanceof SqlLiteral)) {
return null;
}
@@ -1435,7 +1697,10 @@ public class SqlToRelConverter {
return null;
}
- RexNode literalExpr = exprConverter.convertLiteral(bb, (SqlLiteral) sqlNode);
+ RexNode literalExpr =
+ exprConverter.convertLiteral(
+ bb,
+ (SqlLiteral) sqlNode);
if (!(literalExpr instanceof RexLiteral)) {
assert literalExpr.isA(SqlKind.CAST);
@@ -1452,14 +1717,24 @@ public class SqlToRelConverter {
Comparable value = literal.getValue();
if (SqlTypeUtil.isExactNumeric(type) && SqlTypeUtil.hasScale(type)) {
- BigDecimal roundedValue = NumberUtil.rescaleBigDecimal((BigDecimal) value, type.getScale());
- return rexBuilder.makeExactLiteral(roundedValue, type);
+ BigDecimal roundedValue =
+ NumberUtil.rescaleBigDecimal(
+ (BigDecimal) value,
+ type.getScale());
+ return rexBuilder.makeExactLiteral(
+ roundedValue,
+ type);
}
- if ((value instanceof NlsString) && (type.getSqlTypeName() == SqlTypeName.CHAR)) {
+ if ((value instanceof NlsString)
+ && (type.getSqlTypeName() == SqlTypeName.CHAR)) {
// pad fixed character type
NlsString unpadded = (NlsString) value;
- return rexBuilder.makeCharLiteral(new NlsString(Spaces.padRight(unpadded.getValue(), type.getPrecision()), unpadded.getCharsetName(), unpadded.getCollation()));
+ return rexBuilder.makeCharLiteral(
+ new NlsString(
+ Spaces.padRight(unpadded.getValue(), type.getPrecision()),
+ unpadded.getCharsetName(),
+ unpadded.getCollation()));
}
return literal;
}
@@ -1481,67 +1756,78 @@ public class SqlToRelConverter {
* @param logic Whether the answer needs to be in full 3-valued logic (TRUE,
* FALSE, UNKNOWN) will be required, or whether we can accept
* an approximation (say representing UNKNOWN as FALSE)
- * @param registerOnlyScalarSubqueries if set to true and the parse tree
+ * @param registerOnlyScalarSubQueries if set to true and the parse tree
* corresponds to a variation of a select
* node, only register it if it's a scalar
- * subquery
+ * sub-query
*/
- private void findSubqueries(Blackboard bb, SqlNode node, RelOptUtil.Logic logic, boolean registerOnlyScalarSubqueries) {
+ private void findSubQueries(
+ Blackboard bb,
+ SqlNode node,
+ RelOptUtil.Logic logic,
+ boolean registerOnlyScalarSubQueries) {
final SqlKind kind = node.getKind();
switch (kind) {
- case EXISTS:
- case SELECT:
- case MULTISET_QUERY_CONSTRUCTOR:
- case MULTISET_VALUE_CONSTRUCTOR:
- case ARRAY_QUERY_CONSTRUCTOR:
- case CURSOR:
- case SCALAR_QUERY:
- if (!registerOnlyScalarSubqueries || (kind == SqlKind.SCALAR_QUERY)) {
- bb.registerSubquery(node, RelOptUtil.Logic.TRUE_FALSE);
- }
- return;
- case IN:
- if (((SqlCall) node).getOp
<TRUNCATED>
[10/10] kylin git commit: KYLIN-2394 Upgrade Calcite to 1.11 and
Avatica to 1.9.0
Posted by bi...@apache.org.
KYLIN-2394 Upgrade Calcite to 1.11 and Avatica to 1.9.0
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/dfb5fac9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/dfb5fac9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/dfb5fac9
Branch: refs/heads/KYLIN-2394
Commit: dfb5fac9f0d9c13f1128206b73e683b515fe3d72
Parents: 61833d9
Author: Billy Liu <bi...@apache.org>
Authored: Thu Jan 19 14:54:20 2017 +0800
Committer: Billy Liu <bi...@apache.org>
Committed: Thu Jan 19 14:54:20 2017 +0800
----------------------------------------------------------------------
atopcalcite/pom.xml | 11 +-
.../calcite/sql2rel/SqlToRelConverter.java | 3941 +++++++++++-------
jdbc/pom.xml | 12 +-
.../java/org/apache/kylin/jdbc/KylinMeta.java | 2 +-
kylin-it/pom.xml | 14 +-
.../org/apache/kylin/jdbc/ITJDBCDriverTest.java | 1 +
pom.xml | 10 +-
query/pom.xml | 10 +-
8 files changed, 2430 insertions(+), 1571 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/dfb5fac9/atopcalcite/pom.xml
----------------------------------------------------------------------
diff --git a/atopcalcite/pom.xml b/atopcalcite/pom.xml
index b916df2..1b327fe 100644
--- a/atopcalcite/pom.xml
+++ b/atopcalcite/pom.xml
@@ -36,7 +36,16 @@
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.calcite.avatica</groupId>
+ <artifactId>avatica-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.calcite.avatica</groupId>
+ <artifactId>avatica</artifactId>
</dependency>
</dependencies>
-
</project>
[03/10] kylin git commit: KYLIN-2404 Add "hive.merge.mapfiles" and
"hive.merge.mapredfiles" to kylin_hive_conf.xml
Posted by bi...@apache.org.
KYLIN-2404 Add "hive.merge.mapfiles" and "hive.merge.mapredfiles" to kylin_hive_conf.xml
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/131a3f3f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/131a3f3f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/131a3f3f
Branch: refs/heads/KYLIN-2394
Commit: 131a3f3f0ca3fbcc8000188b9973c27406bd9a90
Parents: 6e30376
Author: shaofengshi <sh...@apache.org>
Authored: Wed Jan 18 09:33:19 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Jan 18 16:28:05 2017 +0800
----------------------------------------------------------------------
build/conf/kylin_hive_conf.xml | 11 +++++++++++
1 file changed, 11 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/131a3f3f/build/conf/kylin_hive_conf.xml
----------------------------------------------------------------------
diff --git a/build/conf/kylin_hive_conf.xml b/build/conf/kylin_hive_conf.xml
index 2fc6dab..f01d08e 100644
--- a/build/conf/kylin_hive_conf.xml
+++ b/build/conf/kylin_hive_conf.xml
@@ -88,4 +88,15 @@
<description>Collect statistics for newly created intermediate table</description>
</property>
+ <property>
+ <name>hive.merge.mapfiles</name>
+ <value>false</value>
+ <description>Disable Hive's auto merge</description>
+ </property>
+
+ <property>
+ <name>hive.merge.mapredfiles</name>
+ <value>false</value>
+ <description>Disable Hive's auto merge</description>
+ </property>
</configuration>