You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/27 11:21:12 UTC
[10/52] [abbrv] incubator-kylin git commit: KYLIN-875 Refactor
core-common, remove dependency on hadoop/hbase
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-common/src/test/java/org/apache/kylin/common/util/HBaseMiniclusterHelper.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/HBaseMiniclusterHelper.java b/core-common/src/test/java/org/apache/kylin/common/util/HBaseMiniclusterHelper.java
deleted file mode 100644
index fb0d313..0000000
--- a/core-common/src/test/java/org/apache/kylin/common/util/HBaseMiniclusterHelper.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.util;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.HBaseResourceStore;
-
-/**
- * a helper class to start and shutdown hbase mini cluster
- *
- * @author shaoshi
- */
-public class HBaseMiniclusterHelper {
-
- public static final String SHARED_STORAGE_PREFIX = "KYLIN_";
- public static final String CUBE_STORAGE_PREFIX = "KYLIN_";
- public static final String II_STORAGE_PREFIX = "KYLIN_II_";
- public static final String TEST_METADATA_TABLE = "kylin_metadata";
-
- private static final String hbaseTarLocation = "../examples/test_case_data/minicluster/hbase-export.tar.gz";
- private static final String iiEndpointClassName = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
-
- public static HBaseTestingUtility UTIL = new HBaseTestingUtility();
- private static volatile boolean clusterStarted = false;
- private static String hbaseconnectionUrl = "";
-
- private static final Log logger = LogFactory.getLog(HBaseMiniclusterHelper.class);
-
- static {
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- shutdownMiniCluster();
- }
- });
- }
-
- /**
- * Start the minicluster; Sub-classes should invoke this in BeforeClass method.
- *
- * @throws Exception
- */
- public static void startupMinicluster() throws Exception {
-
- if (!clusterStarted) {
- synchronized (HBaseMiniclusterHelper.class) {
- if (!clusterStarted) {
- startupMiniClusterAndImportData();
- clusterStarted = true;
- }
- }
- } else {
- updateKylinConfigWithMinicluster();
- }
- }
-
- private static void updateKylinConfigWithMinicluster() {
-
- KylinConfig.getInstanceFromEnv().setMetadataUrl(TEST_METADATA_TABLE + "@" + hbaseconnectionUrl);
- KylinConfig.getInstanceFromEnv().setStorageUrl(hbaseconnectionUrl);
- }
-
- private static void startupMiniClusterAndImportData() throws Exception {
-
- logger.info("Going to start mini cluster.");
-
- if (existInClassPath(iiEndpointClassName)) {
- HBaseMiniclusterHelper.UTIL.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, iiEndpointClassName);
- }
-
- //https://issues.apache.org/jira/browse/HBASE-11711
- UTIL.getConfiguration().setInt("hbase.master.info.port", -1);//avoid port clobbering
-
- MiniHBaseCluster hbaseCluster = UTIL.startMiniCluster();
-
- Configuration config = hbaseCluster.getConf();
- String host = config.get(HConstants.ZOOKEEPER_QUORUM);
- String port = config.get(HConstants.ZOOKEEPER_CLIENT_PORT);
- String parent = config.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
-
- // see in: https://hbase.apache.org/book.html#trouble.rs.runtime.zkexpired
- config.set("zookeeper.session.timeout", "1200000");
- config.set("hbase.zookeeper.property.tickTime", "6000");
- // reduce rpc retry
- config.set(HConstants.HBASE_CLIENT_PAUSE, "3000");
- config.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "1");
- config.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000");
-
- hbaseconnectionUrl = "hbase:" + host + ":" + port + ":" + parent;
- updateKylinConfigWithMinicluster();
-
- UTIL.startMiniMapReduceCluster();
-
- // create the metadata htables;
- @SuppressWarnings("unused")
- HBaseResourceStore store = new HBaseResourceStore(KylinConfig.getInstanceFromEnv());
-
- // import the table content
- HbaseImporter.importHBaseData(hbaseTarLocation, UTIL.getConfiguration());
-
- }
-
- private static boolean existInClassPath(String className) {
- try {
- Class.forName(className);
- } catch (ClassNotFoundException e) {
- return false;
- }
- return true;
- }
-
- /**
- * Shutdown the minicluster;
- */
- public static void shutdownMiniCluster() {
-
- logger.info("Going to shutdown mini cluster.");
-
- try {
- UTIL.shutdownMiniMapReduceCluster();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- try {
- UTIL.shutdownMiniCluster();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public static void main(String[] args) {
- HBaseMiniclusterHelper t = new HBaseMiniclusterHelper();
- logger.info(t);
- try {
- HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.MINICLUSTER_TEST_DATA);
- HBaseMiniclusterHelper.startupMinicluster();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- HBaseMiniclusterHelper.shutdownMiniCluster();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-common/src/test/java/org/apache/kylin/common/util/HbaseImporter.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/HbaseImporter.java b/core-common/src/test/java/org/apache/kylin/common/util/HbaseImporter.java
deleted file mode 100644
index 1647d54..0000000
--- a/core-common/src/test/java/org/apache/kylin/common/util/HbaseImporter.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.util;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.mapreduce.Import;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.HBaseConnection;
-
-import com.google.common.base.Preconditions;
-
-/**
- */
-public class HbaseImporter {
-
- private static final Log logger = LogFactory.getLog(HbaseImporter.class);
-
- public static void importHBaseData(String hbaseTarLocation, Configuration conf) throws IOException, ClassNotFoundException, InterruptedException {
-
- if (System.getenv("JAVA_HOME") == null) {
- logger.error("Didn't find $JAVA_HOME, this will cause HBase data import failed. Please set $JAVA_HOME.");
- logger.error("Skipping table import...");
- return;
- }
-
- File exportFile = new File(hbaseTarLocation);
- if (!exportFile.exists()) {
- logger.error("Didn't find the export achieve file on " + exportFile.getAbsolutePath());
- return;
- }
-
- File folder = new File("/tmp/hbase-export/");
- if (folder.exists()) {
- FileUtils.deleteDirectory(folder);
- }
- folder.mkdirs();
- folder.deleteOnExit();
-
- //TarGZUtil.uncompressTarGZ(exportFile, folder);
- FileUtil.unTar(exportFile, folder);
- String[] child = folder.list();
- Preconditions.checkState(child.length == 1);
- String backupFolderName = child[0];
- File backupFolder = new File(folder, backupFolderName);
- String[] tableNames = backupFolder.list();
-
- for (String table : tableNames) {
-
- if (!(table.equalsIgnoreCase(HBaseMiniclusterHelper.TEST_METADATA_TABLE) || table.startsWith(HBaseMiniclusterHelper.SHARED_STORAGE_PREFIX))) {
- continue;
- }
-
- // create the htable; otherwise the import will fail.
- if (table.startsWith(HBaseMiniclusterHelper.II_STORAGE_PREFIX)) {
- HBaseConnection.createHTableIfNeeded(KylinConfig.getInstanceFromEnv().getStorageUrl(), table, "f");
- } else if (table.startsWith(HBaseMiniclusterHelper.CUBE_STORAGE_PREFIX)) {
- HBaseConnection.createHTableIfNeeded(KylinConfig.getInstanceFromEnv().getStorageUrl(), table, "F1", "F2");
- }
-
- // directly import from local fs, no need to copy to hdfs
- String importLocation = "file://" + backupFolder.getAbsolutePath() + "/" + table;
- String[] args = new String[] { table, importLocation };
- boolean result = runImport(args, conf);
- logger.info("importing table '" + table + "' with result:" + result);
-
- if (!result)
- break;
- }
-
- }
-
- private static boolean runImport(String[] args, Configuration configuration) throws IOException, InterruptedException, ClassNotFoundException {
- // need to make a copy of the configuration because to make sure different temp dirs are used.
- GenericOptionsParser opts = new GenericOptionsParser(new Configuration(configuration), args);
- Configuration newConf = opts.getConfiguration();
- args = opts.getRemainingArgs();
- Job job = Import.createSubmittableJob(newConf, args);
- job.waitForCompletion(false);
- return job.isSuccessful();
- }
-
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
- if (args.length != 1) {
- logger.error("Usage: HbaseImporter hbase_tar_lcoation");
- System.exit(-1);
- }
-
- logger.info("The KylinConfig being used:");
- logger.info("=================================================");
- KylinConfig.getInstanceFromEnv().printProperties();
- logger.info("=================================================");
-
- importHBaseData(args[0], HBaseConfiguration.create());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-common/src/test/java/org/apache/kylin/common/util/IdentityUtilTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/IdentityUtilTest.java b/core-common/src/test/java/org/apache/kylin/common/util/IdentityUtilTest.java
index 7e802a4..452c5e6 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/IdentityUtilTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/IdentityUtilTest.java
@@ -1,11 +1,11 @@
package org.apache.kylin.common.util;
-import com.google.common.collect.Lists;
-import org.apache.commons.collections.CollectionUtils;
+import java.util.List;
+
import org.junit.Assert;
import org.junit.Test;
-import java.util.List;
+import com.google.common.collect.Lists;
/**
*/
@@ -20,9 +20,6 @@ public class IdentityUtilTest {
List<String> c2 = Lists.newArrayList(s2);
List<String> c3 = Lists.newArrayList(s2);
- Assert.assertTrue(CollectionUtils.isEqualCollection(c1,c2));
- Assert.assertTrue(CollectionUtils.isEqualCollection(c3,c2));
-
Assert.assertFalse(IdentityUtils.collectionReferenceEquals(c1,c2));
Assert.assertTrue(IdentityUtils.collectionReferenceEquals(c3,c2));
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-common/src/test/java/org/apache/kylin/common/util/RandomSamplerTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/RandomSamplerTest.java b/core-common/src/test/java/org/apache/kylin/common/util/RandomSamplerTest.java
index 89d5701..1cf1e8b 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/RandomSamplerTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/RandomSamplerTest.java
@@ -23,24 +23,21 @@ import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.io.Text;
import org.junit.Test;
/**
- * @author ysong1
- *
*/
public class RandomSamplerTest {
@Test
public void test() {
- RandomSampler<Text> s = new RandomSampler<Text>();
- List<Text> data = new ArrayList<Text>();
+ RandomSampler<String> s = new RandomSampler<String>();
+ List<String> data = new ArrayList<String>();
for (int i = 0; i < 1000; i++) {
- data.add(new Text(String.valueOf(i)));
+ data.add(String.valueOf(i));
}
- List<Text> result = s.sample(data, 50);
+ List<String> result = s.sample(data, 50);
System.out.println(result);
assertEquals(50, result.size());
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-common/src/test/java/org/apache/kylin/common/util/RangeTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/RangeTest.java b/core-common/src/test/java/org/apache/kylin/common/util/RangeTest.java
index 2a9ce40..e18b3e7 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/RangeTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/RangeTest.java
@@ -9,6 +9,7 @@ import com.google.common.collect.Ranges;
/**
*/
+@SuppressWarnings({ "rawtypes", "unchecked" })
public class RangeTest {
@Test
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-common/src/test/java/org/apache/kylin/common/util/SSHClientTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/SSHClientTest.java b/core-common/src/test/java/org/apache/kylin/common/util/SSHClientTest.java
index 5680234..3f32b86 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/SSHClientTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/SSHClientTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
-import org.apache.hadoop.fs.FileUtil;
+import org.apache.commons.io.FileUtils;
import org.apache.kylin.common.KylinConfig;
import org.junit.After;
import org.junit.Before;
@@ -80,7 +80,9 @@ public class SSHClientTest extends LocalFileMetadataTestCase {
return;
SSHClient ssh = new SSHClient(this.hostname, this.port, this.username, this.password);
- File tmpFile = FileUtil.createLocalTempFile(new File("/tmp/test_scp"), "temp_", false);
+ File tmpFile = File.createTempFile("test_scp", "", new File("/tmp"));
+ tmpFile.deleteOnExit();
+ FileUtils.write(tmpFile, "test_scp");
ssh.scpFileToRemote(tmpFile.getAbsolutePath(), "/tmp");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-cube/pom.xml
----------------------------------------------------------------------
diff --git a/core-cube/pom.xml b/core-cube/pom.xml
index 711e4db..8751a78 100644
--- a/core-cube/pom.xml
+++ b/core-cube/pom.xml
@@ -43,6 +43,8 @@
<artifactId>kylin-core-dictionary</artifactId>
<version>${project.parent.version}</version>
</dependency>
+
+ <!-- Env & Test -->
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-core-common</artifactId>
@@ -50,8 +52,6 @@
<scope>test</scope>
<version>${project.parent.version}</version>
</dependency>
-
- <!-- Env & Test -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index cc61dac..6ed4100 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -44,11 +44,11 @@ import org.apache.kylin.cube.model.DimensionDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.DistinctColumnValuesProvider;
import org.apache.kylin.dict.lookup.LookupStringTable;
import org.apache.kylin.dict.lookup.SnapshotManager;
import org.apache.kylin.dict.lookup.SnapshotTable;
import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.PartitionDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
@@ -161,13 +161,13 @@ public class CubeManager implements IRealizationProvider {
return result;
}
- public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, String factColumnsPath) throws IOException {
+ public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
CubeDesc cubeDesc = cubeSeg.getCubeDesc();
if (!cubeDesc.getRowkey().isUseDictionary(col))
return null;
DictionaryManager dictMgr = getDictionaryManager();
- DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, factColumnsPath);
+ DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, factTableValueProvider);
if (dictInfo != null) {
cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
@@ -708,8 +708,6 @@ public class CubeManager implements IRealizationProvider {
* - Favors big segments over the small
*/
private List<CubeSegment> calculateToBeSegments(CubeInstance cube, boolean strictChecking, CubeSegment... newSegments) {
- CubeDesc cubeDesc = cube.getDescriptor();
- PartitionDesc partDesc = cubeDesc.getModel().getPartitionDesc();
List<CubeSegment> tobe = Lists.newArrayList(cube.getSegments());
if (newSegments != null)
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index feb0f1a..e375a19 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -23,11 +23,11 @@ import java.io.IOException;
import org.apache.kylin.cube.model.DimensionDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.DistinctColumnValuesProvider;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
@@ -35,14 +35,14 @@ public class DictionaryGeneratorCLI {
private static final Logger logger = LoggerFactory.getLogger(DictionaryGeneratorCLI.class);
- public static void processSegment(KylinConfig config, String cubeName, String segmentName, String factColumnsPath) throws IOException {
+ public static void processSegment(KylinConfig config, String cubeName, String segmentName, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
CubeSegment segment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
- processSegment(config, segment, factColumnsPath);
+ processSegment(config, segment, factTableValueProvider);
}
- private static void processSegment(KylinConfig config, CubeSegment cubeSeg, String factColumnsPath) throws IOException {
+ private static void processSegment(KylinConfig config, CubeSegment cubeSeg, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
CubeManager cubeMgr = CubeManager.getInstance(config);
for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) {
@@ -50,7 +50,7 @@ public class DictionaryGeneratorCLI {
for (TblColRef col : dim.getColumnRefs()) {
if (cubeSeg.getCubeDesc().getRowkey().isUseDictionary(col)) {
logger.info("Building dictionary for " + col);
- cubeMgr.buildDictionary(cubeSeg, col, factColumnsPath);
+ cubeMgr.buildDictionary(cubeSeg, col, factTableValueProvider);
}
}
@@ -60,8 +60,7 @@ public class DictionaryGeneratorCLI {
logger.info("Building snapshot of " + dim.getTable());
cubeMgr.buildSnapshotTable(cubeSeg, dim.getTable());
logger.info("Checking snapshot of " + dim.getTable());
- cubeMgr.getLookupTable(cubeSeg, dim); // load the table for
- // sanity check
+ cubeMgr.getLookupTable(cubeSeg, dim); // load the table for sanity check
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-dictionary/pom.xml
----------------------------------------------------------------------
diff --git a/core-dictionary/pom.xml b/core-dictionary/pom.xml
index 14b54a0..649f58b 100644
--- a/core-dictionary/pom.xml
+++ b/core-dictionary/pom.xml
@@ -40,6 +40,8 @@
<artifactId>kylin-core-metadata</artifactId>
<version>${project.parent.version}</version>
</dependency>
+
+ <!-- Env & Test -->
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-core-common</artifactId>
@@ -47,8 +49,6 @@
<scope>test</scope>
<version>${project.parent.version}</version>
</dependency>
-
- <!-- Env & Test -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 611251a..a579abf 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -18,23 +18,14 @@
package org.apache.kylin.dict;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.commons.compress.utils.IOUtils;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.dict.lookup.FileTable;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.DataType;
@@ -195,11 +186,11 @@ public class DictionaryManager {
}
}
- public DictionaryInfo buildDictionary(DataModelDesc model, String dict, TblColRef col, String factColumnsPath) throws IOException {
+ public DictionaryInfo buildDictionary(DataModelDesc model, String dict, TblColRef col, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
logger.info("building dictionary for " + col);
- Object[] tmp = decideSourceData(model, dict, col, factColumnsPath);
+ Object[] tmp = decideSourceData(model, dict, col, factTableValueProvider);
String srcTable = (String) tmp[0];
String srcCol = (String) tmp[1];
int srcColIdx = (Integer) tmp[2];
@@ -232,93 +223,42 @@ public class DictionaryManager {
* 4. ReadableTable object
*/
- public Object[] decideSourceData(DataModelDesc model, String dict, TblColRef col, String factColumnsPath) throws IOException {
+ public Object[] decideSourceData(DataModelDesc model, String dict, TblColRef col, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
String srcTable;
String srcCol;
int srcColIdx;
ReadableTable table;
MetadataManager metaMgr = MetadataManager.getInstance(config);
- // case of full table (dict on fact table)
- if (model == null) {
- srcTable = col.getTable();
- srcCol = col.getName();
- srcColIdx = col.getColumnDesc().getZeroBasedIndex();
- int nColumns = metaMgr.getTableDesc(col.getTable()).getColumnCount();
- table = new FileTable(factColumnsPath + "/" + col.getName(), nColumns);
- return new Object[] { srcTable, srcCol, srcColIdx, table };
- }
-
// Decide source data of dictionary:
// 1. If 'useDict' specifies pre-defined data set, use that
// 2. Otherwise find a lookup table to scan through
// Note FK on fact table is supported by scan the related PK on lookup table
- //String useDict = cube.getRowkey().getDictionary(col);
-
// normal case, source from lookup table
if ("true".equals(dict) || "string".equals(dict) || "number".equals(dict) || "any".equals(dict)) {
// FK on fact table and join type is inner, use PK from lookup instead
if (model.isFactTable(col.getTable())) {
TblColRef pkCol = model.findPKByFK(col, "inner");
if (pkCol != null)
- col = pkCol; // scan the counterparty PK on lookup table
- // instead
+ col = pkCol; // scan the counterparty PK on lookup table instead
}
srcTable = col.getTable();
srcCol = col.getName();
srcColIdx = col.getColumnDesc().getZeroBasedIndex();
if (model.isFactTable(col.getTable())) {
- table = new FileTable(factColumnsPath + "/" + col.getName(), -1);
+ table = factTableValueProvider.getDistinctValuesFor(col);
} else {
table = TableSourceFactory.createReadableTable(metaMgr.getTableDesc(col.getTable()));
}
}
- // otherwise could refer to a data set, e.g. common_indicators.txt
- // (LEGACY PATH, since distinct values are collected from fact table)
- else {
- String dictDataSetPath = unpackDataSet(this.config.getTempHDFSDir(), dict);
- if (dictDataSetPath == null)
- throw new IllegalArgumentException("Unknown dictionary data set '" + dict + "', referred from " + col);
- srcTable = "PREDEFINED";
- srcCol = dict;
- srcColIdx = 0;
- table = new FileTable(dictDataSetPath, -1);
- }
+ else
+ throw new IllegalArgumentException("Unknown dictionary value: " + dict);
return new Object[] { srcTable, srcCol, srcColIdx, table };
}
- private String unpackDataSet(String tempHDFSDir, String dataSetName) throws IOException {
-
- InputStream in = this.getClass().getResourceAsStream("/org/apache/kylin/dict/" + dataSetName + ".txt");
- if (in == null) // data set resource not found
- return null;
-
- ByteArrayOutputStream buf = new ByteArrayOutputStream();
- IOUtils.copy(in, buf);
- in.close();
- byte[] bytes = buf.toByteArray();
-
- Path tmpDataSetPath = new Path(tempHDFSDir + "/dict/temp_dataset/" + dataSetName + "_" + bytes.length + ".txt");
-
- FileSystem fs = HadoopUtil.getFileSystem(tempHDFSDir);
- boolean writtenNewFile = false;
- if (fs.exists(tmpDataSetPath) == false || fs.getFileStatus(tmpDataSetPath).getLen() != bytes.length) {
- fs.mkdirs(tmpDataSetPath.getParent());
- FSDataOutputStream out = fs.create(tmpDataSetPath);
- IOUtils.copy(new ByteArrayInputStream(bytes), out);
- out.close();
- writtenNewFile = true;
- }
-
- String qualifiedPath = tmpDataSetPath.makeQualified(fs.getUri(), new Path("/")).toString();
- if (writtenNewFile)
- logger.info("Dictionary temp data set file written to " + qualifiedPath);
- return qualifiedPath;
- }
-
private String checkDupByInfo(DictionaryInfo dictInfo) throws IOException {
ResourceStore store = MetadataManager.getInstance(config).getStore();
ArrayList<String> existings = store.listResources(dictInfo.getResourceDir());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-dictionary/src/main/java/org/apache/kylin/dict/DistinctColumnValuesProvider.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DistinctColumnValuesProvider.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DistinctColumnValuesProvider.java
new file mode 100644
index 0000000..66511be
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DistinctColumnValuesProvider.java
@@ -0,0 +1,17 @@
+package org.apache.kylin.dict;
+
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.ReadableTable;
+
+/**
+ * To build dictionary, we need a list of distinct values on a column.
+ * For column on lookup table, simply scan the whole table since the table is small.
+ * For column on fact table, the fact table is too big to iterate. So the build
+ * engine will first extract distinct values (by a MR job for example), and
+ * implement this interface to provide the result to DictionaryManager.
+ */
+public interface DistinctColumnValuesProvider {
+
+ /** Return a ReadableTable contains only one column, each row being a distinct value. */
+ public ReadableTable getDistinctValuesFor(TblColRef col);
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
deleted file mode 100644
index 59eca4a..0000000
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.source.ReadableTable;
-
-/**
- */
-public class FileTable implements ReadableTable {
-
- public static final String DELIM_AUTO = "auto";
- public static final String DELIM_COMMA = ",";
-
- String path;
- String delim;
- int nColumns;
-
- public FileTable(String path, int nColumns) {
- this(path, DELIM_AUTO, nColumns);
- }
-
- public FileTable(String path, String delim, int nColumns) {
- this.path = path;
- this.delim = delim;
- this.nColumns = nColumns;
- }
-
- public String getColumnDelimeter() {
- return delim;
- }
-
- @Override
- public TableReader getReader() throws IOException {
- return new FileTableReader(path, delim, nColumns);
- }
-
- @Override
- public TableSignature getSignature() throws IOException {
- try {
- Pair<Long, Long> sizeAndLastModified = getSizeAndLastModified(path);
- return new TableSignature(path, sizeAndLastModified.getFirst(), sizeAndLastModified.getSecond());
- } catch (FileNotFoundException ex) {
- return null;
- }
- }
-
- @Override
- public String toString() {
- return path;
- }
-
- public static Pair<Long, Long> getSizeAndLastModified(String path) throws IOException {
- FileSystem fs = HadoopUtil.getFileSystem(path);
-
- // get all contained files if path is directory
- ArrayList<FileStatus> allFiles = new ArrayList<>();
- FileStatus status = fs.getFileStatus(new Path(path));
- if (status.isFile()) {
- allFiles.add(status);
- } else {
- FileStatus[] listStatus = fs.listStatus(new Path(path));
- allFiles.addAll(Arrays.asList(listStatus));
- }
-
- long size = 0;
- long lastModified = 0;
- for (FileStatus file : allFiles) {
- size += file.getLen();
- lastModified = Math.max(lastModified, file.getModificationTime());
- }
-
- return new Pair<Long, Long>(size, lastModified);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
deleted file mode 100644
index 4e04c93..0000000
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import java.io.BufferedReader;
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.StringSplitter;
-import org.apache.kylin.source.ReadableTable.TableReader;
-
-/**
- * Tables are typically CSV or SEQ file.
- *
- * @author yangli9
- */
-public class FileTableReader implements TableReader {
-
- private static final Logger logger = LoggerFactory.getLogger(FileTableReader.class);
- private static final char CSV_QUOTE = '"';
- private static final String[] DETECT_DELIMS = new String[] { "\177", "|", "\t", "," };
-
- private String filePath;
- private String delim;
- private RowReader reader;
-
- private String curLine;
- private String[] curColumns;
- private int expectedColumnNumber = -1; // helps delimiter detection
-
- public FileTableReader(String filePath, int expectedColumnNumber) throws IOException {
- this(filePath, FileTable.DELIM_AUTO, expectedColumnNumber);
- }
-
- public FileTableReader(String filePath, String delim, int expectedColumnNumber) throws IOException {
- filePath = HadoopUtil.fixWindowsPath(filePath);
- this.filePath = filePath;
- this.delim = delim;
- this.expectedColumnNumber = expectedColumnNumber;
-
- FileSystem fs = HadoopUtil.getFileSystem(filePath);
-
- try {
- this.reader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, filePath);
-
- } catch (IOException e) {
- if (isExceptionSayingNotSeqFile(e) == false)
- throw e;
-
- this.reader = new CsvRowReader(fs, filePath);
- }
- }
-
- private boolean isExceptionSayingNotSeqFile(IOException e) {
- if (e.getMessage() != null && e.getMessage().contains("not a SequenceFile"))
- return true;
-
- if (e instanceof EOFException) // in case the file is very very small
- return true;
-
- return false;
- }
-
- @Override
- public boolean next() throws IOException {
- curLine = reader.nextLine();
- curColumns = null;
- return curLine != null;
- }
-
- public String getLine() {
- return curLine;
- }
-
- @Override
- public String[] getRow() {
- if (curColumns == null) {
- if (FileTable.DELIM_AUTO.equals(delim))
- delim = autoDetectDelim(curLine);
-
- if (delim == null)
- curColumns = new String[] { curLine };
- else
- curColumns = split(curLine, delim);
- }
- return curColumns;
- }
-
- private String[] split(String line, String delim) {
- // FIXME CVS line should be parsed considering escapes
- String str[] = StringSplitter.split(line, delim);
-
- // un-escape CSV
- if (FileTable.DELIM_COMMA.equals(delim)) {
- for (int i = 0; i < str.length; i++) {
- str[i] = unescapeCsv(str[i]);
- }
- }
-
- return str;
- }
-
- private String unescapeCsv(String str) {
- if (str == null || str.length() < 2)
- return str;
-
- str = StringEscapeUtils.unescapeCsv(str);
-
- // unescapeCsv may not remove the outer most quotes
- if (str.charAt(0) == CSV_QUOTE && str.charAt(str.length() - 1) == CSV_QUOTE)
- str = str.substring(1, str.length() - 1);
-
- return str;
- }
-
- @Override
- public void close() throws IOException {
- if (reader != null)
- reader.close();
- }
-
- private String autoDetectDelim(String line) {
- if (expectedColumnNumber > 0) {
- for (String delim : DETECT_DELIMS) {
- if (StringSplitter.split(line, delim).length == expectedColumnNumber) {
- logger.info("Auto detect delim to be '" + delim + "', split line to " + expectedColumnNumber + " columns -- " + line);
- return delim;
- }
- }
- }
-
- logger.info("Auto detect delim to be null, will take THE-WHOLE-LINE as a single value, for " + filePath);
- return null;
- }
-
- // ============================================================================
-
- private interface RowReader extends Closeable {
- String nextLine() throws IOException; // return null on EOF
- }
-
- private class SeqRowReader implements RowReader {
- Reader reader;
- Writable key;
- Text value;
-
- SeqRowReader(Configuration hconf, FileSystem fs, String path) throws IOException {
- reader = new Reader(hconf, SequenceFile.Reader.file(new Path(path)));
- key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
- value = new Text();
- }
-
- @Override
- public String nextLine() throws IOException {
- boolean hasNext = reader.next(key, value);
- if (hasNext)
- return Bytes.toString(value.getBytes(), 0, value.getLength());
- else
- return null;
- }
-
- @Override
- public void close() throws IOException {
- reader.close();
- }
- }
-
- private class CsvRowReader implements RowReader {
- BufferedReader reader;
-
- CsvRowReader(FileSystem fs, String path) throws IOException {
- FSDataInputStream in = fs.open(new Path(path));
- reader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
- }
-
- @Override
- public String nextLine() throws IOException {
- return reader.readLine();
- }
-
- @Override
- public void close() throws IOException {
- reader.close();
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-dictionary/src/test/java/org/apache/kylin/dict/LookupTableTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/LookupTableTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/LookupTableTest.java
deleted file mode 100644
index 86fa635..0000000
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/LookupTableTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict;
-
-import java.io.File;
-
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.dict.lookup.FileTable;
-import org.apache.kylin.dict.lookup.LookupBytesTable;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.kylin.common.util.Array;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-
-/**
- * @author yangli9
- */
-public class LookupTableTest extends LocalFileMetadataTestCase {
-
- @Before
- public void setup() throws Exception {
- createTestMetadata();
- }
-
- @After
- public void after() throws Exception {
- cleanupTestMetadata();
- }
-
- @Test
- public void testBasic() throws Exception {
- TableDesc siteTable = MetadataManager.getInstance(getTestConfig()).getTableDesc("EDW.TEST_SITES");
- TableDesc categoryTable = MetadataManager.getInstance(getTestConfig()).getTableDesc("DEFAULT.test_category_groupings");
- LookupBytesTable lookup;
-
- System.out.println("============================================================================");
-
- File f = new File(LOCALMETA_TEST_DATA + "/data/EDW.TEST_SITES.csv");
- lookup = new LookupBytesTable(siteTable, new String[] { "SITE_ID" }, new FileTable("file://" + f.getAbsolutePath(), 10));
- lookup.dump();
-
- System.out.println("============================================================================");
-
- f = new File(LOCALMETA_TEST_DATA + "/data/DEFAULT.TEST_CATEGORY_GROUPINGS.csv");
- lookup = new LookupBytesTable(categoryTable, new String[] { "leaf_categ_id", "site_id" }, new FileTable("file://" + f.getAbsolutePath(), 36));
- lookup.dump();
-
- System.out.println("============================================================================");
-
- ByteArray k1 = new ByteArray(Bytes.toBytes("533"));
- ByteArray k2 = new ByteArray(Bytes.toBytes("0"));
- Array<ByteArray> key = new Array<ByteArray>(new ByteArray[] { k1, k2 });
- System.out.println(lookup.getRow(key));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java
deleted file mode 100644
index cfecaee..0000000
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.kylin.dict.lookup.FileTable;
-import org.apache.kylin.dict.lookup.FileTableReader;
-import org.junit.Test;
-
-/**
- * @author yangli9
- *
- */
-public class TableReaderTest {
-
- @Test
- public void testBasicReader() throws IOException {
- File f = new File("src/test/resources/dict/DW_SITES");
- FileTableReader reader = new FileTableReader("file://" + f.getAbsolutePath(), FileTable.DELIM_AUTO, 10);
- while (reader.next()) {
- assertEquals("[-1, Korea Auction.co.kr, S, 48, 0, 111, 2009-02-11, , DW_OFFPLAT, ]", Arrays.toString(reader.getRow()));
- break;
- }
- reader.close();
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
new file mode 100644
index 0000000..490e3f7
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
@@ -0,0 +1,9 @@
+package org.apache.kylin.job.lock;
+
+/**
+ */
+public interface JobLock {
+ boolean lock();
+
+ void unlock();
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
new file mode 100644
index 0000000..f5cea4e
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
@@ -0,0 +1,15 @@
+package org.apache.kylin.job.lock;
+
+/**
+ */
+public class MockJobLock implements JobLock {
+ @Override
+ public boolean lock() {
+ return true;
+ }
+
+ @Override
+ public void unlock() {
+ return;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-metadata/pom.xml
----------------------------------------------------------------------
diff --git a/core-metadata/pom.xml b/core-metadata/pom.xml
index 3c4f180..71457bc 100644
--- a/core-metadata/pom.xml
+++ b/core-metadata/pom.xml
@@ -40,6 +40,8 @@
<artifactId>kylin-core-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
+
+ <!-- Env & Test -->
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-core-common</artifactId>
@@ -47,9 +49,6 @@
<scope>test</scope>
<version>${project.parent.version}</version>
</dependency>
-
- <!-- Env & Test -->
-
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/engine-mr/pom.xml
----------------------------------------------------------------------
diff --git a/engine-mr/pom.xml b/engine-mr/pom.xml
index 748a9df..a226e80 100644
--- a/engine-mr/pom.xml
+++ b/engine-mr/pom.xml
@@ -53,6 +53,46 @@
<!-- Env & Test -->
<dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>provided</scope>
+ <!-- protobuf version conflict with hbase -->
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
new file mode 100644
index 0000000..152b4af
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.source.ReadableTable;
+
+/**
+ */
+public class DFSFileTable implements ReadableTable {
+
+ public static final String DELIM_AUTO = "auto";
+ public static final String DELIM_COMMA = ",";
+
+ String path;
+ String delim;
+ int nColumns;
+
+ public DFSFileTable(String path, int nColumns) {
+ this(path, DELIM_AUTO, nColumns);
+ }
+
+ public DFSFileTable(String path, String delim, int nColumns) {
+ this.path = path;
+ this.delim = delim;
+ this.nColumns = nColumns;
+ }
+
+ public String getColumnDelimeter() {
+ return delim;
+ }
+
+ @Override
+ public TableReader getReader() throws IOException {
+ return new DFSFileTableReader(path, delim, nColumns);
+ }
+
+ @Override
+ public TableSignature getSignature() throws IOException {
+ try {
+ Pair<Long, Long> sizeAndLastModified = getSizeAndLastModified(path);
+ return new TableSignature(path, sizeAndLastModified.getFirst(), sizeAndLastModified.getSecond());
+ } catch (FileNotFoundException ex) {
+ return null;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return path;
+ }
+
+ public static Pair<Long, Long> getSizeAndLastModified(String path) throws IOException {
+ FileSystem fs = HadoopUtil.getFileSystem(path);
+
+ // get all contained files if path is directory
+ ArrayList<FileStatus> allFiles = new ArrayList<>();
+ FileStatus status = fs.getFileStatus(new Path(path));
+ if (status.isFile()) {
+ allFiles.add(status);
+ } else {
+ FileStatus[] listStatus = fs.listStatus(new Path(path));
+ allFiles.addAll(Arrays.asList(listStatus));
+ }
+
+ long size = 0;
+ long lastModified = 0;
+ for (FileStatus file : allFiles) {
+ size += file.getLen();
+ lastModified = Math.max(lastModified, file.getModificationTime());
+ }
+
+ return new Pair<Long, Long>(size, lastModified);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
new file mode 100644
index 0000000..51fb734
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.kylin.common.util.StringSplitter;
+import org.apache.kylin.source.ReadableTable.TableReader;
+
+/**
+ * Tables are typically CSV or SEQ file.
+ *
+ * @author yangli9
+ */
+public class DFSFileTableReader implements TableReader {
+
+ private static final Logger logger = LoggerFactory.getLogger(DFSFileTableReader.class);
+ private static final char CSV_QUOTE = '"';
+ private static final String[] DETECT_DELIMS = new String[] { "\177", "|", "\t", "," };
+
+ private String filePath;
+ private String delim;
+ private RowReader reader;
+
+ private String curLine;
+ private String[] curColumns;
+ private int expectedColumnNumber = -1; // helps delimiter detection
+
+ public DFSFileTableReader(String filePath, int expectedColumnNumber) throws IOException {
+ this(filePath, DFSFileTable.DELIM_AUTO, expectedColumnNumber);
+ }
+
+ public DFSFileTableReader(String filePath, String delim, int expectedColumnNumber) throws IOException {
+ filePath = HadoopUtil.fixWindowsPath(filePath);
+ this.filePath = filePath;
+ this.delim = delim;
+ this.expectedColumnNumber = expectedColumnNumber;
+
+ FileSystem fs = HadoopUtil.getFileSystem(filePath);
+
+ try {
+ this.reader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, filePath);
+
+ } catch (IOException e) {
+ if (isExceptionSayingNotSeqFile(e) == false)
+ throw e;
+
+ this.reader = new CsvRowReader(fs, filePath);
+ }
+ }
+
+ private boolean isExceptionSayingNotSeqFile(IOException e) {
+ if (e.getMessage() != null && e.getMessage().contains("not a SequenceFile"))
+ return true;
+
+ if (e instanceof EOFException) // in case the file is very very small
+ return true;
+
+ return false;
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ curLine = reader.nextLine();
+ curColumns = null;
+ return curLine != null;
+ }
+
+ public String getLine() {
+ return curLine;
+ }
+
+ @Override
+ public String[] getRow() {
+ if (curColumns == null) {
+ if (DFSFileTable.DELIM_AUTO.equals(delim))
+ delim = autoDetectDelim(curLine);
+
+ if (delim == null)
+ curColumns = new String[] { curLine };
+ else
+ curColumns = split(curLine, delim);
+ }
+ return curColumns;
+ }
+
+ private String[] split(String line, String delim) {
+ // FIXME CVS line should be parsed considering escapes
+ String str[] = StringSplitter.split(line, delim);
+
+ // un-escape CSV
+ if (DFSFileTable.DELIM_COMMA.equals(delim)) {
+ for (int i = 0; i < str.length; i++) {
+ str[i] = unescapeCsv(str[i]);
+ }
+ }
+
+ return str;
+ }
+
+ private String unescapeCsv(String str) {
+ if (str == null || str.length() < 2)
+ return str;
+
+ str = StringEscapeUtils.unescapeCsv(str);
+
+ // unescapeCsv may not remove the outer most quotes
+ if (str.charAt(0) == CSV_QUOTE && str.charAt(str.length() - 1) == CSV_QUOTE)
+ str = str.substring(1, str.length() - 1);
+
+ return str;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (reader != null)
+ reader.close();
+ }
+
+ private String autoDetectDelim(String line) {
+ if (expectedColumnNumber > 0) {
+ for (String delim : DETECT_DELIMS) {
+ if (StringSplitter.split(line, delim).length == expectedColumnNumber) {
+ logger.info("Auto detect delim to be '" + delim + "', split line to " + expectedColumnNumber + " columns -- " + line);
+ return delim;
+ }
+ }
+ }
+
+ logger.info("Auto detect delim to be null, will take THE-WHOLE-LINE as a single value, for " + filePath);
+ return null;
+ }
+
+ // ============================================================================
+
+ private interface RowReader extends Closeable {
+ String nextLine() throws IOException; // return null on EOF
+ }
+
+ private class SeqRowReader implements RowReader {
+ Reader reader;
+ Writable key;
+ Text value;
+
+ SeqRowReader(Configuration hconf, FileSystem fs, String path) throws IOException {
+ reader = new Reader(hconf, SequenceFile.Reader.file(new Path(path)));
+ key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
+ value = new Text();
+ }
+
+ @Override
+ public String nextLine() throws IOException {
+ boolean hasNext = reader.next(key, value);
+ if (hasNext)
+ return Bytes.toString(value.getBytes(), 0, value.getLength());
+ else
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+ }
+
+ private class CsvRowReader implements RowReader {
+ BufferedReader reader;
+
+ CsvRowReader(FileSystem fs, String path) throws IOException {
+ FSDataInputStream in = fs.open(new Path(path));
+ reader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
+ }
+
+ @Override
+ public String nextLine() throws IOException {
+ return reader.readLine();
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
new file mode 100644
index 0000000..1c00993
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.io.Writable;
+
+public class HadoopUtil {
+
+ private static ThreadLocal<Configuration> hadoopConfig = new ThreadLocal<>();
+
+ public static void setCurrentConfiguration(Configuration conf) {
+ hadoopConfig.set(conf);
+ }
+
+ public static Configuration getCurrentConfiguration() {
+ if (hadoopConfig.get() == null) {
+ hadoopConfig.set(new Configuration());
+ }
+ return hadoopConfig.get();
+ }
+
+ public static FileSystem getFileSystem(String path) throws IOException {
+ return FileSystem.get(makeURI(path), getCurrentConfiguration());
+ }
+
+ public static URI makeURI(String filePath) {
+ try {
+ return new URI(fixWindowsPath(filePath));
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Cannot create FileSystem from URI: " + filePath, e);
+ }
+ }
+
+ public static String fixWindowsPath(String path) {
+ // fix windows path
+ if (path.startsWith("file://") && !path.startsWith("file:///") && path.contains(":\\")) {
+ path = path.replace("file://", "file:///");
+ }
+ if (path.startsWith("file:///")) {
+ path = path.replace('\\', '/');
+ }
+ return path;
+ }
+
+ public static Configuration newHadoopJobConfiguration() {
+ Configuration conf = new Configuration();
+ conf.set(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, "8");
+ return conf;
+ }
+
+ /**
+ * @param table the identifier of hive table, in format <db_name>.<table_name>
+ * @return a string array with 2 elements: {"db_name", "table_name"}
+ */
+ public static String[] parseHiveTableName(String table) {
+ int cut = table.indexOf('.');
+ String database = cut >= 0 ? table.substring(0, cut).trim() : "DEFAULT";
+ String tableName = cut >= 0 ? table.substring(cut + 1).trim() : table.trim();
+
+ return new String[] { database, tableName };
+ }
+
+ public static void deletePath(Configuration conf, Path path) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ }
+ }
+
+ public static byte[] toBytes(Writable writable) {
+ try {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(bout);
+ writable.write(out);
+ out.close();
+ bout.close();
+ return bout.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
new file mode 100644
index 0000000..a678e70
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ */
+public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+
+ protected void bindCurrentConfiguration(Configuration conf) {
+ HadoopUtil.setCurrentConfiguration(conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
new file mode 100644
index 0000000..846c849
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ */
+public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+ protected void bindCurrentConfiguration(Configuration conf) {
+ HadoopUtil.setCurrentConfiguration(conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/engine-mr/src/test/java/org/apache/kylin/engine/mr/LookupTableTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/LookupTableTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/LookupTableTest.java
new file mode 100644
index 0000000..87f9133
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/LookupTableTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.io.File;
+
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.dict.lookup.LookupBytesTable;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author yangli9
+ */
+public class LookupTableTest extends LocalFileMetadataTestCase {
+
+ @Before
+ public void setup() throws Exception {
+ createTestMetadata();
+ }
+
+ @After
+ public void after() throws Exception {
+ cleanupTestMetadata();
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+ TableDesc siteTable = MetadataManager.getInstance(getTestConfig()).getTableDesc("EDW.TEST_SITES");
+ TableDesc categoryTable = MetadataManager.getInstance(getTestConfig()).getTableDesc("DEFAULT.test_category_groupings");
+ LookupBytesTable lookup;
+
+ System.out.println("============================================================================");
+
+ File f = new File(LOCALMETA_TEST_DATA + "/data/EDW.TEST_SITES.csv");
+ lookup = new LookupBytesTable(siteTable, new String[] { "SITE_ID" }, new DFSFileTable("file://" + f.getAbsolutePath(), 10));
+ lookup.dump();
+
+ System.out.println("============================================================================");
+
+ f = new File(LOCALMETA_TEST_DATA + "/data/DEFAULT.TEST_CATEGORY_GROUPINGS.csv");
+ lookup = new LookupBytesTable(categoryTable, new String[] { "leaf_categ_id", "site_id" }, new DFSFileTable("file://" + f.getAbsolutePath(), 36));
+ lookup.dump();
+
+ System.out.println("============================================================================");
+
+ ByteArray k1 = new ByteArray(Bytes.toBytes("533"));
+ ByteArray k2 = new ByteArray(Bytes.toBytes("0"));
+ Array<ByteArray> key = new Array<ByteArray>(new ByteArray[] { k1, k2 });
+ System.out.println(lookup.getRow(key));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/engine-mr/src/test/java/org/apache/kylin/engine/mr/TableReaderTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/TableReaderTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/TableReaderTest.java
new file mode 100644
index 0000000..2db139a
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/TableReaderTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+/**
+ * @author yangli9
+ *
+ */
+public class TableReaderTest {
+
+ @Test
+ public void testBasicReader() throws IOException {
+ File f = new File("src/test/resources/dict/DW_SITES");
+ DFSFileTableReader reader = new DFSFileTableReader("file://" + f.getAbsolutePath(), DFSFileTable.DELIM_AUTO, 10);
+ while (reader.next()) {
+ assertEquals("[-1, Korea Auction.co.kr, S, 48, 0, 111, 2009-02-11, , DW_OFFPLAT, ]", Arrays.toString(reader.getRow()));
+ break;
+ }
+ reader.close();
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/invertedindex/pom.xml
----------------------------------------------------------------------
diff --git a/invertedindex/pom.xml b/invertedindex/pom.xml
index 2fc0220..2eec2ce 100644
--- a/invertedindex/pom.xml
+++ b/invertedindex/pom.xml
@@ -30,18 +30,15 @@
<dependencies>
+ <!--Kylin Jar -->
<dependency>
<groupId>org.apache.kylin</groupId>
- <artifactId>kylin-core-common</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
+ <artifactId>kylin-core-metadata</artifactId>
<version>${project.parent.version}</version>
</dependency>
-
- <!--Kylin Jar -->
<dependency>
<groupId>org.apache.kylin</groupId>
- <artifactId>kylin-core-metadata</artifactId>
+ <artifactId>kylin-core-dictionary</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
@@ -51,48 +48,18 @@
</dependency>
<dependency>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-configuration</groupId>
- <artifactId>commons-configuration</artifactId>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-httpclient</groupId>
- <artifactId>commons-httpclient</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>com.ning</groupId>
- <artifactId>compress-lzf</artifactId>
- </dependency>
- <dependency>
<groupId>com.n3twork.druid</groupId>
<artifactId>extendedset</artifactId>
</dependency>
<!-- Env & Test -->
-
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <version>${project.parent.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
index faf9079..a8a2244 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
@@ -18,25 +18,29 @@
package org.apache.kylin.invertedindex;
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonManagedReference;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.metadata.model.*;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.LookupDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.metadata.realization.SQLDigest;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonManagedReference;
+import com.fasterxml.jackson.annotation.JsonProperty;
/**
* @author honma
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
index f478caf..547ff15 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
@@ -20,6 +20,7 @@ package org.apache.kylin.invertedindex;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
@@ -30,6 +31,7 @@ import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.DistinctColumnValuesProvider;
import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
@@ -130,7 +132,7 @@ public class IIManager implements IRealizationProvider {
return result;
}
- public void buildInvertedIndexDictionary(IISegment iiSeg, String factColumnsPath) throws IOException {
+ public void buildInvertedIndexDictionary(IISegment iiSeg, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
logger.info("Start building ii dictionary");
DictionaryManager dictMgr = getDictionaryManager();
IIDesc iiDesc = iiSeg.getIIInstance().getDescriptor();
@@ -140,7 +142,7 @@ public class IIManager implements IRealizationProvider {
continue;
}
- DictionaryInfo dict = dictMgr.buildDictionary(iiDesc.getModel(), "true", column, factColumnsPath);
+ DictionaryInfo dict = dictMgr.buildDictionary(iiDesc.getModel(), "true", column, factTableValueProvider);
iiSeg.putDictResPath(column, dict.getResourcePath());
}
updateII(iiSeg.getIIInstance());