You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/06/01 05:43:14 UTC
[kylin] 04/08: KYLIN-3376 Some improvements for lookup table -
query change
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 5a96f8b46ce73684a19a781a7aa0ce6a587900c7
Author: Ma,Gang <ga...@ebay.com>
AuthorDate: Thu May 10 13:09:11 2018 +0800
KYLIN-3376 Some improvements for lookup table - query change
---
.../apache/kylin/cube/CubeCapabilityChecker.java | 2 +-
core-dictionary/pom.xml | 5 +
.../kylin/dict/lookup/LookupStringTable.java | 12 +-
.../org/apache/kylin/dict/lookup/LookupTable.java | 4 +-
.../dict/lookup/cache/RocksDBLookupBuilder.java | 83 ++++
.../dict/lookup/cache/RocksDBLookupRowEncoder.java | 70 ++++
.../dict/lookup/cache/RocksDBLookupTable.java | 113 ++++++
.../dict/lookup/cache/RocksDBLookupTableCache.java | 420 +++++++++++++++++++++
.../lookup/cache/RocksDBLookupRowEncoderTest.java | 80 ++++
.../lookup/cache/RocksDBLookupTableCacheTest.java | 220 +++++++++++
.../dict/lookup/cache/RocksDBLookupTableTest.java | 161 ++++++++
.../org/apache/kylin/job/SelfStopExecutable.java | 2 +-
.../kylin/storage/gtrecord/CubeTupleConverter.java | 63 ++--
.../storage/gtrecord/GTCubeStorageQueryBase.java | 14 +-
.../kylin/storage/gtrecord/ITupleConverter.java | 3 +-
.../storage/gtrecord/SegmentCubeTupleIterator.java | 1 +
.../storage/translate/DerivedFilterTranslator.java | 6 +-
pom.xml | 6 +
.../query/enumerator/LookupTableEnumerator.java | 19 +-
19 files changed, 1224 insertions(+), 60 deletions(-)
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
index 5dffd96..e12b689 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
@@ -75,7 +75,7 @@ public class CubeCapabilityChecker {
}
} else {
//for non query-on-facttable
- if (cube.getSegments().get(0).getSnapshots().containsKey(digest.factTable)) {
+ if (cube.getSegments().get(0).getSnapshots().containsKey(digest.factTable) || cube.getSnapshots().containsKey(digest.factTable)) {
Set<TblColRef> dimCols = Sets.newHashSet(cube.getModel().findFirstTable(digest.factTable).getColumns());
diff --git a/core-dictionary/pom.xml b/core-dictionary/pom.xml
index 7e8bee1..d82d204 100644
--- a/core-dictionary/pom.xml
+++ b/core-dictionary/pom.xml
@@ -38,6 +38,11 @@
<artifactId>kylin-core-metadata</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.rocksdb</groupId>
+ <artifactId>rocksdbjni</artifactId>
+ </dependency>
+
<!-- Env & Test -->
<dependency>
<groupId>org.apache.kylin</groupId>
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
index 886de22..1d0348a 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
@@ -20,6 +20,7 @@ package org.apache.kylin.dict.lookup;
import java.io.IOException;
import java.util.Comparator;
+import java.util.Iterator;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.metadata.datatype.DataType;
@@ -31,7 +32,7 @@ import org.apache.kylin.source.IReadableTable;
* @author yangli9
*
*/
-public class LookupStringTable extends LookupTable<String> {
+public class LookupStringTable extends LookupTable<String> implements ILookupTable{
private static final Comparator<String> dateStrComparator = new Comparator<String>() {
@Override
@@ -109,4 +110,13 @@ public class LookupStringTable extends LookupTable<String> {
return String.class;
}
+ @Override
+ public Iterator<String[]> iterator() {
+ return data.values().iterator();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
index a99ef29..e1123fa 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
@@ -22,10 +22,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.IOUtils;
import org.apache.kylin.common.util.Array;
@@ -53,7 +53,7 @@ abstract public class LookupTable<T> {
this.tableDesc = tableDesc;
this.keyColumns = keyColumns;
this.table = table;
- this.data = new ConcurrentHashMap<Array<T>, T[]>();
+ this.data = new HashMap<Array<T>, T[]>();
init();
}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupBuilder.java
new file mode 100644
index 0000000..269684e
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupBuilder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import java.io.File;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.cache.RocksDBLookupRowEncoder.KV;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.util.SizeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocksDBLookupBuilder {
+ private static final Logger logger = LoggerFactory.getLogger(RocksDBLookupBuilder.class);
+
+ static {
+ RocksDB.loadLibrary();
+ }
+ private Options options;
+ private String dbPath;
+ private TableDesc tableDesc;
+ private RocksDBLookupRowEncoder encoder;
+ private int writeBatchSize;
+
+ public RocksDBLookupBuilder(TableDesc tableDesc, String[] keyColumns, String dbPath) {
+ this.tableDesc = tableDesc;
+ this.encoder = new RocksDBLookupRowEncoder(tableDesc, keyColumns);
+ this.dbPath = dbPath;
+ this.writeBatchSize = 500;
+ this.options = new Options();
+ options.setCreateIfMissing(true).setWriteBufferSize(8 * SizeUnit.KB).setMaxWriteBufferNumber(3)
+ .setMaxBackgroundCompactions(5).setCompressionType(CompressionType.SNAPPY_COMPRESSION)
+ .setCompactionStyle(CompactionStyle.UNIVERSAL);
+
+ }
+
+ public void build(ILookupTable srcLookupTable) {
+ File dbFolder = new File(dbPath);
+ if (dbFolder.exists()) {
+ logger.info("remove rocksdb folder:{} to rebuild table cache:{}", dbPath, tableDesc.getIdentity());
+ FileUtils.deleteQuietly(dbFolder);
+ } else {
+ logger.info("create new rocksdb folder:{} for table cache:{}", dbPath, tableDesc.getIdentity());
+ dbFolder.mkdirs();
+ }
+ logger.info("start to build lookup table:{} to rocks db:{}", tableDesc.getIdentity(), dbPath);
+ try (RocksDB rocksDB = RocksDB.open(options, dbPath)) {
+ // todo use batch may improve write performance
+ for (String[] row : srcLookupTable) {
+ KV kv = encoder.encode(row);
+
+ rocksDB.put(kv.getKey(), kv.getValue());
+ }
+ } catch (RocksDBException e) {
+ throw new RuntimeException("error when write data to rocks db", e);
+ }
+
+ logger.info("source table:{} has been written to rocks db:{}", tableDesc.getIdentity(), dbPath);
+ }
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoder.java
new file mode 100644
index 0000000..7455a7b
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoder.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import org.apache.kylin.dict.lookup.AbstractLookupRowEncoder;
+import org.apache.kylin.dict.lookup.cache.RocksDBLookupRowEncoder.KV;
+import org.apache.kylin.metadata.model.TableDesc;
+
+/**
+ * encode/decode original table row to rocksDB KV
+ *
+ */
+public class RocksDBLookupRowEncoder extends AbstractLookupRowEncoder<KV>{
+
+ public RocksDBLookupRowEncoder(TableDesc tableDesc, String[] keyColumns) {
+ super(tableDesc, keyColumns);
+ }
+
+ public KV encode(String[] row) {
+ String[] keys = getKeyData(row);
+ String[] values = getValueData(row);
+ byte[] encodeKey = encodeStringsWithLenPfx(keys, false);
+ byte[] encodeValue = encodeStringsWithLenPfx(values, true);
+
+ return new KV(encodeKey, encodeValue);
+ }
+
+ public String[] decode(KV kv) {
+ String[] result = new String[columnsNum];
+
+ decodeFromLenPfxBytes(kv.key, keyIndexes, result);
+ decodeFromLenPfxBytes(kv.value, valueIndexes, result);
+
+ return result;
+ }
+
+ public static class KV {
+ private byte[] key;
+ private byte[] value;
+
+ public KV(byte[] key, byte[] value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public byte[] getKey() {
+ return key;
+ }
+
+ public byte[] getValue() {
+ return value;
+ }
+ }
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTable.java
new file mode 100644
index 0000000..4f0c816
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTable.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.cache.RocksDBLookupRowEncoder.KV;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocksDBLookupTable implements ILookupTable {
+ private static final Logger logger = LoggerFactory.getLogger(RocksDBLookupTable.class);
+ static {
+ RocksDB.loadLibrary();
+ }
+
+ private TableDesc tableDesc;
+ private RocksDB rocksDB;
+ private Options options;
+
+ private RocksDBLookupRowEncoder rowEncoder;
+
+ public RocksDBLookupTable(TableDesc tableDesc, String[] keyColumns, String dbPath) {
+ this.tableDesc = tableDesc;
+ this.options = new Options();
+ this.rowEncoder = new RocksDBLookupRowEncoder(tableDesc, keyColumns);
+ try {
+ this.rocksDB = RocksDB.openReadOnly(options, dbPath);
+ } catch (RocksDBException e) {
+ throw new RuntimeException("cannot open rocks db in path:" + dbPath, e);
+ }
+ }
+
+ @Override
+ public String[] getRow(Array<String> key) {
+ byte[] encodeKey = rowEncoder.encodeStringsWithLenPfx(key.data, false);
+ try {
+ byte[] value = rocksDB.get(encodeKey);
+ if (value == null) {
+ return null;
+ }
+ return rowEncoder.decode(new KV(encodeKey, value));
+ } catch (RocksDBException e) {
+ throw new RuntimeException("error when get key from rocksdb", e);
+ }
+ }
+
+ @Override
+ public Iterator<String[]> iterator() {
+ final RocksIterator rocksIterator = rocksDB.newIterator();
+ rocksIterator.seekToFirst();
+
+ return new Iterator<String[]>() {
+ int counter;
+ @Override
+ public boolean hasNext() {
+ boolean valid = rocksIterator.isValid();
+ if (!valid) {
+ rocksIterator.close();
+ }
+ return valid;
+ }
+
+ @Override
+ public String[] next() {
+ counter ++;
+ if (counter % 100000 == 0) {
+ logger.info("scanned {} rows from rocksDB", counter);
+ }
+ String[] result = rowEncoder.decode(new KV(rocksIterator.key(), rocksIterator.value()));
+ rocksIterator.next();
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("not support operation");
+ }
+ };
+ }
+
+ @Override
+ public void close() throws IOException {
+ options.close();
+ if (rocksDB != null) {
+ rocksDB.close();
+ }
+ }
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java
new file mode 100644
index 0000000..460c448
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.dict.lookup.IExtLookupTableCache;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.LookupProviderFactory;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Predicate;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.Weigher;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+
+public class RocksDBLookupTableCache implements IExtLookupTableCache {
+ private static final Logger logger = LoggerFactory.getLogger(RocksDBLookupTableCache.class);
+
+ private static final String CACHE_TYPE_ROCKSDB = "rocksdb";
+ private static final String STATE_FILE = "STATE";
+ private static final String DB_FILE = "db";
+
+ private String basePath;
+ private long maxCacheSizeInKB;
+ private Cache<String, CachedTableInfo> tablesCache;
+
+ private ConcurrentMap<String, Boolean> inBuildingTables = Maps.newConcurrentMap();
+
+ private ExecutorService cacheBuildExecutor;
+ private ScheduledExecutorService cacheStateCheckExecutor;
+ private CacheStateChecker cacheStateChecker;
+
+ // static cached instances
+ private static final ConcurrentMap<KylinConfig, RocksDBLookupTableCache> SERVICE_CACHE = new ConcurrentHashMap<>();
+
+ public static RocksDBLookupTableCache getInstance(KylinConfig config) {
+ RocksDBLookupTableCache r = SERVICE_CACHE.get(config);
+ if (r == null) {
+ synchronized (RocksDBLookupTableCache.class) {
+ r = SERVICE_CACHE.get(config);
+ if (r == null) {
+ r = new RocksDBLookupTableCache(config);
+ SERVICE_CACHE.put(config, r);
+ if (SERVICE_CACHE.size() > 1) {
+ logger.warn("More than one singleton exist");
+ }
+ }
+ }
+ }
+ return r;
+ }
+
+ public static void clearCache() {
+ synchronized (SERVICE_CACHE) {
+ SERVICE_CACHE.clear();
+ }
+ }
+
+ // ============================================================================
+
+ private KylinConfig config;
+
+ private RocksDBLookupTableCache(KylinConfig config) {
+ this.config = config;
+ init();
+ }
+
+ private void init() {
+ this.basePath = getCacheBasePath(config);
+
+ this.maxCacheSizeInKB = (long) (config.getExtTableSnapshotLocalCacheMaxSizeGB() * 1024 * 1024);
+ this.tablesCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, CachedTableInfo>() {
+ @Override
+ public void onRemoval(RemovalNotification<String, CachedTableInfo> notification) {
+ logger.warn(notification.getValue() + " is removed " + "because of " + notification.getCause());
+ notification.getValue().cleanStorage();
+ }
+ }).maximumWeight(maxCacheSizeInKB).weigher(new Weigher<String, CachedTableInfo>() {
+ @Override
+ public int weigh(String key, CachedTableInfo value) {
+ return value.getSizeInKB();
+ }
+ }).build();
+ restoreCacheState();
+ cacheStateChecker = new CacheStateChecker();
+ initExecutors();
+ }
+
+ protected static String getCacheBasePath(KylinConfig config) {
+ String basePath = config.getExtTableSnapshotLocalCachePath();
+ if (!basePath.startsWith("/")) {
+ basePath = KylinConfig.getKylinHome() + File.separator + basePath;
+ }
+ return basePath + File.separator + CACHE_TYPE_ROCKSDB;
+ }
+
+ private void restoreCacheState() {
+ File dbBaseFolder = new File(basePath);
+ if (!dbBaseFolder.exists()) {
+ dbBaseFolder.mkdirs();
+ }
+ Map<String, File[]> tableSnapshotsFileMap = getCachedTableSnapshotsFolders(dbBaseFolder);
+ for (Entry<String, File[]> tableSnapshotsEntry : tableSnapshotsFileMap.entrySet()) {
+ for (File snapshotFolder : tableSnapshotsEntry.getValue()) {
+ initSnapshotState(tableSnapshotsEntry.getKey(), snapshotFolder);
+ }
+ }
+ }
+
+ private Map<String, File[]> getCachedTableSnapshotsFolders(File dbBaseFolder) {
+ Map<String, File[]> result = Maps.newHashMap();
+ File[] tableFolders = dbBaseFolder.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File file) {
+ return file.isDirectory();
+ }
+ });
+ if (tableFolders == null) {
+ return result;
+ }
+
+ for (File tableFolder : tableFolders) {
+ String tableName = tableFolder.getName();
+ File[] snapshotFolders = tableFolder.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File snapshotFile) {
+ return snapshotFile.isDirectory();
+ }
+ });
+ result.put(tableName, snapshotFolders);
+ }
+ return result;
+ }
+
+ private void initSnapshotState(String tableName, File snapshotCacheFolder) {
+ String snapshotID = snapshotCacheFolder.getName();
+ File stateFile = getCacheStateFile(snapshotCacheFolder.getAbsolutePath());
+ if (stateFile.exists()) {
+ try {
+ String stateStr = Files.toString(stateFile, Charsets.UTF_8);
+ String resourcePath = ExtTableSnapshotInfo.getResourcePath(tableName, snapshotID);
+ if (CacheState.AVAILABLE.name().equals(stateStr)) {
+ tablesCache.put(resourcePath, new CachedTableInfo(snapshotCacheFolder.getAbsolutePath()));
+ }
+ } catch (IOException e) {
+ logger.error("error to read state file:" + stateFile.getAbsolutePath());
+ }
+ }
+ }
+
+ private void initExecutors() {
+ this.cacheBuildExecutor = new ThreadPoolExecutor(0, 50, 60L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("lookup-cache-build-thread"));
+ this.cacheStateCheckExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(
+ "lookup-cache-state-checker"));
+ cacheStateCheckExecutor.scheduleAtFixedRate(cacheStateChecker, 10, 10 * 60, TimeUnit.SECONDS); // check every 10 minutes
+ }
+
+ @Override
+ public ILookupTable getCachedLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo, boolean buildIfNotExist) {
+ String resourcePath = extTableSnapshotInfo.getResourcePath();
+ if (inBuildingTables.containsKey(resourcePath)) {
+ logger.info("cache is in building for snapshot:" + resourcePath);
+ return null;
+ }
+ CachedTableInfo cachedTableInfo = tablesCache.getIfPresent(resourcePath);
+ if (cachedTableInfo == null) {
+ if (buildIfNotExist) {
+ buildSnapshotCache(tableDesc, extTableSnapshotInfo, getSourceLookupTable(tableDesc, extTableSnapshotInfo));
+ }
+ logger.info("no available cache ready for the table snapshot:" + extTableSnapshotInfo.getResourcePath());
+ return null;
+ }
+
+ String[] keyColumns = extTableSnapshotInfo.getKeyColumns();
+ String dbPath = getSnapshotStorePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId());
+ return new RocksDBLookupTable(tableDesc, keyColumns, dbPath);
+ }
+
+ private ILookupTable getSourceLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo) {
+ return LookupProviderFactory.getExtLookupTableWithoutCache(tableDesc, extTableSnapshotInfo);
+ }
+
+ @Override
+ public void buildSnapshotCache(final TableDesc tableDesc, final ExtTableSnapshotInfo extTableSnapshotInfo, final ILookupTable sourceTable) {
+ if (extTableSnapshotInfo.getSignature().getSize() / 1024 > maxCacheSizeInKB * 2 / 3) {
+ logger.warn("the size is to large to build to cache for snapshot:{}, size:{}, skip cache building",
+ extTableSnapshotInfo.getResourcePath(), extTableSnapshotInfo.getSignature().getSize());
+ return;
+ }
+ final String[] keyColumns = extTableSnapshotInfo.getKeyColumns();
+ final String cachePath = getSnapshotCachePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId());
+ final String dbPath = getSnapshotStorePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId());
+ final String snapshotResPath = extTableSnapshotInfo.getResourcePath();
+
+ if (inBuildingTables.containsKey(snapshotResPath)) {
+ logger.info("there is already snapshot cache in building for snapshot:{}, skip it", snapshotResPath);
+ return;
+ }
+ if (inBuildingTables.putIfAbsent(snapshotResPath, true) == null) {
+ cacheBuildExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ RocksDBLookupBuilder builder = new RocksDBLookupBuilder(tableDesc, keyColumns, dbPath);
+ builder.build(sourceTable);
+ } finally {
+ inBuildingTables.remove(snapshotResPath);
+ }
+ saveSnapshotCacheState(extTableSnapshotInfo, cachePath);
+ }
+ });
+
+ } else {
+ logger.info("there is already snapshot cache in building for snapshot:{}, skip it", snapshotResPath);
+ }
+ }
+
+ @Override
+ public void removeSnapshotCache(ExtTableSnapshotInfo extTableSnapshotInfo) {
+ tablesCache.invalidate(extTableSnapshotInfo.getResourcePath());
+ }
+
+ @Override
+ public CacheState getCacheState(ExtTableSnapshotInfo extTableSnapshotInfo) {
+ String resourcePath = extTableSnapshotInfo.getResourcePath();
+ if (inBuildingTables.containsKey(resourcePath)) {
+ return CacheState.IN_BUILDING;
+ }
+ File stateFile = getCacheStateFile(getSnapshotCachePath(extTableSnapshotInfo.getTableName(),
+ extTableSnapshotInfo.getId()));
+ if (!stateFile.exists()) {
+ return CacheState.NONE;
+ }
+ try {
+ String stateString = Files.toString(stateFile, Charsets.UTF_8);
+ return CacheState.valueOf(stateString);
+ } catch (IOException e) {
+ logger.error("error when read state file", e);
+ }
+ return CacheState.NONE;
+ }
+
+ public long getTotalCacheSize() {
+ return FileUtils.sizeOfDirectory(new File(getCacheBasePath(config)));
+ }
+
+ public void checkCacheState() {
+ cacheStateChecker.run();
+ }
+
+ private void saveSnapshotCacheState(ExtTableSnapshotInfo extTableSnapshotInfo, String cachePath) {
+ File stateFile = getCacheStateFile(getSnapshotCachePath(extTableSnapshotInfo.getTableName(),
+ extTableSnapshotInfo.getId()));
+ try {
+ Files.write(CacheState.AVAILABLE.name(), stateFile, Charsets.UTF_8);
+ tablesCache.put(extTableSnapshotInfo.getResourcePath(), new CachedTableInfo(cachePath));
+ } catch (IOException e) {
+ throw new RuntimeException("error when write cache state for snapshot:"
+ + extTableSnapshotInfo.getResourcePath());
+ }
+ }
+
+ private File getCacheStateFile(String snapshotCacheFolder) {
+ String stateFilePath = snapshotCacheFolder + File.separator + STATE_FILE;
+ return new File(stateFilePath);
+ }
+
+ protected String getSnapshotStorePath(String tableName, String snapshotID) {
+ return getSnapshotCachePath(tableName, snapshotID) + File.separator + DB_FILE;
+ }
+
+ protected String getSnapshotCachePath(String tableName, String snapshotID) {
+ return basePath + File.separator + tableName + File.separator + snapshotID;
+ }
+
+ private class CacheStateChecker implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ String cacheBasePath = getCacheBasePath(config);
+ logger.info("check snapshot local cache state, local path:{}", cacheBasePath);
+ File baseFolder = new File(cacheBasePath);
+ if (!baseFolder.exists()) {
+ return;
+ }
+ Map<String, File[]> tableSnapshotsFileMap = getCachedTableSnapshotsFolders(baseFolder);
+ List<Pair<String, File>> allCachedSnapshots = Lists.newArrayList();
+ for (Entry<String, File[]> tableSnapshotsEntry : tableSnapshotsFileMap.entrySet()) {
+ String tableName = tableSnapshotsEntry.getKey();
+ for (File file : tableSnapshotsEntry.getValue()) {
+ String snapshotID = file.getName();
+ allCachedSnapshots
+ .add(new Pair<>(ExtTableSnapshotInfo.getResourcePath(tableName, snapshotID), file));
+ }
+ }
+
+ final Set<String> activeSnapshotSet = ExtTableSnapshotInfoManager.getInstance(config).getAllExtSnapshotResPaths();
+
+ List<Pair<String, File>> toRemovedCachedSnapshots = Lists.newArrayList(FluentIterable.from(
+ allCachedSnapshots).filter(new Predicate<Pair<String, File>>() {
+ @Override
+ public boolean apply(@Nullable Pair<String, File> input) {
+ return !activeSnapshotSet.contains(input.getFirst());
+ }
+ }));
+ for (Pair<String, File> toRemovedCachedSnapshot : toRemovedCachedSnapshots) {
+ File snapshotCacheFolder = toRemovedCachedSnapshot.getSecond();
+ logger.info("removed cache file:{}, it is not referred by any cube",
+ snapshotCacheFolder.getAbsolutePath());
+ try {
+ FileUtils.deleteDirectory(snapshotCacheFolder);
+ } catch (IOException e) {
+ logger.error("fail to remove folder:" + snapshotCacheFolder.getAbsolutePath(), e);
+ }
+ tablesCache.invalidate(toRemovedCachedSnapshot.getFirst());
+ }
+ } catch (Exception e) {
+ logger.error("error happens when check cache state", e);
+ }
+
+ }
+ }
+
+ private static class CachedTableInfo {
+ private String cachePath;
+
+ private long dbSize;
+
+ public CachedTableInfo(String cachePath) {
+ this.cachePath = cachePath;
+ this.dbSize = FileUtils.sizeOfDirectory(new File(cachePath));
+ }
+
+ public int getSizeInKB() {
+ return (int) (dbSize / 1024);
+ }
+
+ public void cleanStorage() {
+ logger.info("clean cache storage for path:" + cachePath);
+ try {
+ FileUtils.deleteDirectory(new File(cachePath));
+ } catch (IOException e) {
+ logger.error("file delete fail:" + cachePath, e);
+ }
+ }
+ }
+
+ /**
+ * A simple named thread factory.
+ */
+ private static class NamedThreadFactory implements ThreadFactory {
+ private final ThreadGroup group;
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+ private final String namePrefix;
+
+ public NamedThreadFactory(String threadPrefix) {
+ final SecurityManager s = System.getSecurityManager();
+ this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+ this.namePrefix = threadPrefix + "-";
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ final Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
+ t.setDaemon(true);
+ return t;
+ }
+ }
+}
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoderTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoderTest.java
new file mode 100644
index 0000000..ff5fdff
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoderTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.dict.lookup.cache.RocksDBLookupRowEncoder.KV;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RocksDBLookupRowEncoderTest extends LocalFileMetadataTestCase {
+ private TableDesc tableDesc;
+
+ @Before
+ public void setup() throws Exception {
+ this.createTestMetadata();
+ TableMetadataManager metadataManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+ tableDesc = metadataManager.getTableDesc("TEST_COUNTRY", "default");
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ }
+
+ @Test
+ public void testEnDeCode() {
+ RocksDBLookupRowEncoder lookupRowEncoder = new RocksDBLookupRowEncoder(tableDesc, new String[] { "COUNTRY" });
+ String[] row = new String[] { "AD", "42.546245", "1.601554", "Andorra" };
+ KV kv = lookupRowEncoder.encode(row);
+
+ String[] decodeRow = lookupRowEncoder.decode(kv);
+ assertArrayEquals(row, decodeRow);
+ }
+
+ @Test
+ public void testEnDeCodeWithNullValue() {
+ RocksDBLookupRowEncoder lookupRowEncoder = new RocksDBLookupRowEncoder(tableDesc, new String[] { "COUNTRY" });
+ String[] row = new String[] { "AD", "42.546245", "1.601554", null };
+ KV kv = lookupRowEncoder.encode(row);
+
+ String[] decodeRow = lookupRowEncoder.decode(kv);
+ assertNull(decodeRow[3]);
+ assertArrayEquals(row, decodeRow);
+ }
+
+ @Test
+ public void testEnDeCodeWithMultiKeys() {
+ RocksDBLookupRowEncoder lookupRowEncoder = new RocksDBLookupRowEncoder(tableDesc, new String[] { "COUNTRY",
+ "NAME" });
+ String[] row = new String[] { "AD", "42.546245", "1.601554", "Andorra" };
+ KV kv = lookupRowEncoder.encode(row);
+
+ String[] decodeRow = lookupRowEncoder.decode(kv);
+ assertArrayEquals(row, decodeRow);
+ }
+
+}
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCacheTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCacheTest.java
new file mode 100644
index 0000000..55b25b2
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCacheTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.IExtLookupProvider;
+import org.apache.kylin.dict.lookup.IExtLookupTableCache;
+import org.apache.kylin.dict.lookup.IExtLookupTableCache.CacheState;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.LookupProviderFactory;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable.TableSignature;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+/**
+ */
+public class RocksDBLookupTableCacheTest extends LocalFileMetadataTestCase {
+ private static final String TABLE_COUNTRY = "DEFAULT.TEST_COUNTRY";
+ private static final String MOCK_EXT_LOOKUP = "mock";
+ private TableDesc tableDesc;
+ private KylinConfig kylinConfig;
+
+ @Before
+ public void setup() throws Exception {
+ createTestMetadata();
+ System.setProperty("KYLIN_HOME", ".");
+ TableMetadataManager metadataManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+ kylinConfig = getTestConfig();
+ tableDesc = metadataManager.getTableDesc(TABLE_COUNTRY, "default");
+ cleanCache();
+ LookupProviderFactory.registerLookupProvider(MOCK_EXT_LOOKUP, MockedLookupProvider.class.getName());
+ }
+
+ private void cleanCache() {
+ FileUtils.deleteQuietly(new File(kylinConfig.getExtTableSnapshotLocalCachePath()));
+ }
+
+ @After
+ public void tearDown() {
+ cleanupTestMetadata();
+ cleanCache();
+ }
+
+ @Test
+ public void testBuildTableCache() throws Exception {
+ String snapshotID = UUID.randomUUID().toString();
+ ExtTableSnapshotInfo snapshotInfo = buildSnapshotCache(snapshotID, 100000);
+ assertEquals(CacheState.AVAILABLE, RocksDBLookupTableCache.getInstance(kylinConfig).getCacheState(snapshotInfo));
+ }
+
+ private ExtTableSnapshotInfo buildSnapshotCache(String snapshotID, int rowCnt) throws Exception {
+ ExtTableSnapshotInfo snapshotInfo = new ExtTableSnapshotInfo();
+ snapshotInfo.setTableName(TABLE_COUNTRY);
+ snapshotInfo.setUuid(snapshotID);
+ snapshotInfo.setStorageType(MOCK_EXT_LOOKUP);
+ snapshotInfo.setKeyColumns(new String[] { "COUNTRY" });
+ snapshotInfo.setRowCnt(rowCnt);
+ snapshotInfo.setSignature(new TableSignature("/test", rowCnt, System.currentTimeMillis()));
+
+ RocksDBLookupTableCache cache = RocksDBLookupTableCache.getInstance(kylinConfig);
+ cache.buildSnapshotCache(tableDesc, snapshotInfo, getLookupTableWithRandomData(rowCnt));
+
+ while (cache.getCacheState(snapshotInfo) == CacheState.IN_BUILDING) {
+ Thread.sleep(500);
+ }
+ return snapshotInfo;
+ }
+
+ @Test
+ public void testRestoreCacheFromFiles() throws Exception {
+ String snapshotID = UUID.randomUUID().toString();
+ String snapshotCacheBasePath = RocksDBLookupTableCache.getCacheBasePath(kylinConfig) + File.separator
+ + TABLE_COUNTRY + File.separator + snapshotID;
+ String dbPath = snapshotCacheBasePath + File.separator + "db";
+ RocksDBLookupBuilder builder = new RocksDBLookupBuilder(tableDesc, new String[] { "COUNTRY" }, dbPath);
+ builder.build(getLookupTableWithRandomData(10000));
+ String stateFilePath = snapshotCacheBasePath + File.separator + "STATE";
+ Files.write(CacheState.AVAILABLE.name(), new File(stateFilePath), Charsets.UTF_8);
+
+ RocksDBLookupTableCache cache = RocksDBLookupTableCache.getInstance(kylinConfig);
+ ExtTableSnapshotInfo snapshotInfo = new ExtTableSnapshotInfo();
+ snapshotInfo.setTableName(TABLE_COUNTRY);
+ snapshotInfo.setUuid(snapshotID);
+ snapshotInfo.setStorageType(MOCK_EXT_LOOKUP);
+ snapshotInfo.setKeyColumns(new String[] { "COUNTRY" });
+ ILookupTable lookupTable = cache.getCachedLookupTable(tableDesc, snapshotInfo, false);
+ int rowCnt = 0;
+ for (String[] strings : lookupTable) {
+ rowCnt++;
+ }
+ lookupTable.close();
+ assertEquals(10000, rowCnt);
+ }
+
+ @Test
+ public void testEvict() throws Exception {
+ kylinConfig.setProperty("kylin.snapshot.ext.local.cache.max-size-gb", "0.005");
+ int snapshotNum = 10;
+ int snapshotRowCnt = 100000;
+ for (int i = 0; i < snapshotNum; i++) {
+ buildSnapshotCache(UUID.randomUUID().toString(), snapshotRowCnt);
+ }
+ assertTrue(RocksDBLookupTableCache.getInstance(kylinConfig).getTotalCacheSize() < 0.006 * 1024 * 1024 * 1024);
+ }
+
+ @Test
+ public void testCheckCacheState() throws Exception {
+ ExtTableSnapshotInfo snapshotInfo = buildSnapshotCache(UUID.randomUUID().toString(), 1000);
+ RocksDBLookupTableCache cache = RocksDBLookupTableCache.getInstance(kylinConfig);
+ ILookupTable cachedLookupTable = cache.getCachedLookupTable(tableDesc, snapshotInfo, false);
+ assertNotNull(cachedLookupTable);
+ cachedLookupTable.close();
+ cache.checkCacheState();
+ String cacheLocalPath = cache.getSnapshotCachePath(snapshotInfo.getTableName(), snapshotInfo.getId());
+ assertFalse(new File(cacheLocalPath).exists());
+ cachedLookupTable = cache.getCachedLookupTable(tableDesc, snapshotInfo, false);
+ assertNull(cachedLookupTable);
+ }
+
+ public static class MockedLookupProvider implements IExtLookupProvider {
+
+ @Override
+ public ILookupTable getLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) {
+ return getLookupTableWithRandomData(extTableSnapshot.getRowCnt());
+ }
+
+ @Override
+ public IExtLookupTableCache getLocalCache() {
+ return null;
+ }
+
+ @Override
+ public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+ return null;
+ }
+ }
+
+ private static ILookupTable getLookupTableWithRandomData(final long rowNum) {
+ return new ILookupTable() {
+ Random random = new Random();
+
+ @Override
+ public String[] getRow(Array<String> key) {
+ return new String[0];
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public Iterator<String[]> iterator() {
+ return new Iterator<String[]>() {
+ private int iterCnt = 0;
+
+ @Override
+ public boolean hasNext() {
+ return iterCnt < rowNum;
+ }
+
+ @Override
+ public String[] next() {
+ iterCnt++;
+ return genRandomRow(iterCnt);
+ }
+
+ @Override
+ public void remove() {
+
+ }
+ };
+ }
+
+ private String[] genRandomRow(int id) {
+ return new String[] { "keyyyyy" + id, String.valueOf(random.nextDouble()),
+ String.valueOf(random.nextDouble()), "Andorra" + id };
+ }
+ };
+ }
+
+}
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableTest.java
new file mode 100644
index 0000000..d5bbd40
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class RocksDBLookupTableTest extends LocalFileMetadataTestCase {
+
+ private TableDesc tableDesc;
+ private RocksDBLookupTable lookupTable;
+ private Random random;
+ private int sourceRowNum;
+
+ @Before
+ public void setup() throws Exception {
+ createTestMetadata();
+ TableMetadataManager metadataManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+ this.random = new Random();
+ tableDesc = metadataManager.getTableDesc("TEST_COUNTRY", "default");
+ sourceRowNum = 10000;
+ genTestData();
+ lookupTable = new RocksDBLookupTable(tableDesc, new String[] { "COUNTRY" }, "lookup_cache/TEST_COUNTRY");
+ }
+
+ private void genTestData() {
+ removeTestDataIfExists();
+ File file = new File("lookup_cache/TEST_COUNTRY");
+ file.mkdirs();
+ RocksDBLookupBuilder builder = new RocksDBLookupBuilder(tableDesc, new String[] { "COUNTRY" },
+ "lookup_cache/TEST_COUNTRY");
+ long start = System.currentTimeMillis();
+ builder.build(getLookupTableWithRandomData(sourceRowNum));
+ long take = System.currentTimeMillis() - start;
+ System.out.println("take:" + take + " ms to complete build");
+ }
+
+ private void removeTestDataIfExists() {
+ FileUtils.deleteQuietly(new File("lookup_cache"));
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ cleanupTestMetadata();
+ removeTestDataIfExists();
+ lookupTable.close();
+ }
+
+ @Test
+ public void testIterator() throws Exception {
+ System.out.println("start iterator table");
+ long start = System.currentTimeMillis();
+ Iterator<String[]> iter = lookupTable.iterator();
+ int count = 0;
+ while (iter.hasNext()) {
+ iter.next();
+ count++;
+ }
+ long take = System.currentTimeMillis() - start;
+ System.out.println("scan " + count + " rows, take " + take + " ms");
+
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ int getNum = 3000;
+ List<String[]> keys = Lists.newArrayList();
+ for (int i = 0; i < getNum; i++) {
+ String[] keyi = new String[] { "keyyyyy" + random.nextInt(sourceRowNum) };
+ keys.add(keyi);
+ }
+
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < getNum; i++) {
+ String[] row = lookupTable.getRow(new Array<>(keys.get(i)));
+ if (row == null) {
+ System.out.println("null value for key:" + Arrays.toString(keys.get(i)));
+ }
+ }
+ long take = System.currentTimeMillis() - start;
+ System.out.println("muliti get " + getNum + " rows, take " + take + " ms");
+ }
+
+ private ILookupTable getLookupTableWithRandomData(final int rowNum) {
+ return new ILookupTable() {
+ @Override
+ public String[] getRow(Array<String> key) {
+ return new String[0];
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public Iterator<String[]> iterator() {
+ return new Iterator<String[]>() {
+ private int iterCnt = 0;
+
+ @Override
+ public boolean hasNext() {
+ return iterCnt < rowNum;
+ }
+
+ @Override
+ public String[] next() {
+ iterCnt++;
+ return genRandomRow(iterCnt);
+ }
+
+ @Override
+ public void remove() {
+
+ }
+ };
+ }
+ };
+ }
+
+ private String[] genRandomRow(int id) {
+ return new String[] { "keyyyyy" + id, String.valueOf(random.nextDouble()), String.valueOf(random.nextDouble()),
+ "Andorra" + id };
+ }
+
+}
diff --git a/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java b/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
index 09f5a7a..ddeeafa 100644
--- a/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
@@ -36,7 +36,7 @@ public class SelfStopExecutable extends BaseTestExecutable {
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
doingWork = true;
try {
- for (int i = 0; i < 20; i++) {
+ for (int i = 0; i < 60; i++) {
sleepOneSecond();
if (isDiscarded())
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index 8af9e3f..05fc90f 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -19,30 +19,26 @@
package org.apache.kylin.storage.gtrecord;
import java.io.IOException;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Array;
import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
-import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.dict.lookup.SnapshotManager;
-import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.dict.lookup.ILookupTable;
import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
-import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.tuple.Tuple;
import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.source.IReadableTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,6 +63,7 @@ public class CubeTupleConverter implements ITupleConverter {
public final List<IAdvMeasureFiller> advMeasureFillers;
public final List<Integer> advMeasureIndexInGTValues;
+ private List<ILookupTable> usedLookupTables;
public final int nSelectedDims;
@@ -86,6 +83,7 @@ public class CubeTupleConverter implements ITupleConverter {
advMeasureFillers = Lists.newArrayListWithCapacity(1);
advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1);
+ usedLookupTables = Lists.newArrayList();
////////////
@@ -178,6 +176,17 @@ public class CubeTupleConverter implements ITupleConverter {
}
}
+ @Override
+ public void close() throws IOException {
+ for (ILookupTable usedLookupTable : usedLookupTables) {
+ try {
+ usedLookupTable.close();
+ } catch (Exception e) {
+ logger.error("error when close lookup table:" + usedLookupTable);
+ }
+ }
+ }
+
protected interface IDerivedColumnFiller {
public void fillDerivedColumns(Object[] gtValues, Tuple tuple);
}
@@ -204,7 +213,7 @@ public class CubeTupleConverter implements ITupleConverter {
switch (deriveInfo.type) {
case LOOKUP:
return new IDerivedColumnFiller() {
- LookupStringTable lookupTable = getLookupTable(cubeSeg, deriveInfo.join);
+ ILookupTable lookupTable = getAndAddLookupTable(cubeSeg, deriveInfo.join);
int[] derivedColIdx = initDerivedColIdx();
Array<String> lookupKey = new Array<String>(new String[hostTmpIdx.length]);
@@ -263,40 +272,10 @@ public class CubeTupleConverter implements ITupleConverter {
return -1;
}
- public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
- long ts = System.currentTimeMillis();
-
- TableMetadataManager metaMgr = TableMetadataManager.getInstance(cubeSeg.getCubeInstance().getConfig());
- SnapshotManager snapshotMgr = SnapshotManager.getInstance(cubeSeg.getCubeInstance().getConfig());
-
- String tableName = join.getPKSide().getTableIdentity();
- String[] pkCols = join.getPrimaryKey();
- String snapshotResPath = cubeSegment.getSnapshotResPath(tableName);
- if (snapshotResPath == null)
- throw new IllegalStateException("No snaphot for table '" + tableName + "' found on cube segment" + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment);
-
- try {
- SnapshotTable snapshot = snapshotMgr.getSnapshotTable(snapshotResPath);
- TableDesc tableDesc = metaMgr.getTableDesc(tableName, cubeSegment.getProject());
- EnhancedStringLookupTable enhancedStringLookupTable = new EnhancedStringLookupTable(tableDesc, pkCols, snapshot);
- logger.info("Time to get lookup up table for {} is {} ", join.getPKSide().getTableName(), (System.currentTimeMillis() - ts));
- return enhancedStringLookupTable;
- } catch (IOException e) {
- throw new IllegalStateException("Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
- }
- }
-
- public static class EnhancedStringLookupTable extends LookupStringTable {
-
- public EnhancedStringLookupTable(TableDesc tableDesc, String[] keyColumns, IReadableTable table) throws IOException {
- super(tableDesc, keyColumns, table);
- }
-
- @Override
- protected void init() throws IOException {
- this.data = new HashMap<>();
- super.init();
- }
+ public ILookupTable getAndAddLookupTable(CubeSegment cubeSegment, JoinDesc join) {
+ ILookupTable lookupTable = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getLookupTable(cubeSegment, join);
+ usedLookupTables.add(lookupTable);
+ return lookupTable;
}
private static String toString(Object o) {
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 2f69b76..5e11e91 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -18,6 +18,7 @@
package org.apache.kylin.storage.gtrecord;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -36,7 +37,7 @@ import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
import org.apache.kylin.cube.model.RowKeyColDesc;
-import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.dict.lookup.ILookupTable;
import org.apache.kylin.gridtable.StorageLimitLevel;
import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.measure.bitmap.BitmapMeasureType;
@@ -363,9 +364,16 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
return compf;
DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
- LookupStringTable lookup = cubeDesc.getHostInfo(derived).type == CubeDesc.DeriveType.PK_FK ? null
+ ILookupTable lookup = cubeDesc.getHostInfo(derived).type == CubeDesc.DeriveType.PK_FK ? null
: getLookupStringTableForDerived(derived, hostInfo);
Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
+ try {
+ if (lookup != null) {
+ lookup.close();
+ }
+ } catch (IOException e) {
+ logger.error("error when close lookup table.", e);
+ }
TupleFilter translatedFilter = translated.getFirst();
boolean loosened = translated.getSecond();
if (loosened) {
@@ -375,7 +383,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
}
@SuppressWarnings("unchecked")
- protected LookupStringTable getLookupStringTableForDerived(TblColRef derived, DeriveInfo hostInfo) {
+ protected ILookupTable getLookupStringTableForDerived(TblColRef derived, DeriveInfo hostInfo) {
CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
CubeSegment seg = cubeInstance.getLatestReadySegment();
return cubeMgr.getLookupTable(seg, hostInfo.join);
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
index dd48e4d..72e6848 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
@@ -18,12 +18,13 @@
package org.apache.kylin.storage.gtrecord;
+import java.io.Closeable;
import java.util.List;
import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
import org.apache.kylin.metadata.tuple.Tuple;
-public interface ITupleConverter {
+public interface ITupleConverter extends Closeable {
public List<IAdvMeasureFiller> translateResult(Object[] gtValues, Tuple tuple);
}
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
index 3bac5ec..a6e0bc1 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
@@ -202,6 +202,7 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
protected void close(CubeSegmentScanner scanner) {
try {
scanner.close();
+ cubeTupleConverter.close();
} catch (IOException e) {
logger.error("Exception when close CubeScanner", e);
}
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
index f4150fe..7fa426f 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
@@ -27,7 +27,7 @@ import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.kv.RowKeyColumnOrder;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
import org.apache.kylin.cube.model.CubeDesc.DeriveType;
-import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.dict.lookup.ILookupTable;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.ConstantTupleFilter;
@@ -50,7 +50,7 @@ public class DerivedFilterTranslator {
private static final Logger logger = LoggerFactory.getLogger(DerivedFilterTranslator.class);
- public static Pair<TupleFilter, Boolean> translate(LookupStringTable lookup, DeriveInfo hostInfo, CompareTupleFilter compf) {
+ public static Pair<TupleFilter, Boolean> translate(ILookupTable lookup, DeriveInfo hostInfo, CompareTupleFilter compf) {
TblColRef derivedCol = compf.getColumn();
TblColRef[] hostCols = hostInfo.columns;
@@ -76,7 +76,7 @@ public class DerivedFilterTranslator {
Set<Array<String>> satisfyingHostRecords = Sets.newHashSet();
SingleColumnTuple tuple = new SingleColumnTuple(derivedCol);
- for (String[] row : lookup.getAllRows()) {
+ for (String[] row : lookup) {
tuple.value = row[di];
if (compf.evaluate(tuple, FilterCodeSystemFactory.getFilterCodeSystem(derivedCol.getColumnDesc().getType()))) {
collect(row, pi, satisfyingHostRecords);
diff --git a/pom.xml b/pom.xml
index 159a251..25a0297 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,6 +114,7 @@
<tomcat.version>7.0.85</tomcat.version>
<t-digest.version>3.1</t-digest.version>
<freemarker.version>2.3.23</freemarker.version>
+ <rocksdb.version>5.9.2</rocksdb.version>
<!--metric-->
<dropwizard.version>3.1.2</dropwizard.version>
<!-- REST Service, ref https://github.com/spring-projects/spring-boot/blob/v1.3.8.RELEASE/spring-boot-dependencies/pom.xml -->
@@ -665,6 +666,11 @@
<artifactId>freemarker</artifactId>
<version>${freemarker.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.rocksdb</groupId>
+ <artifactId>rocksdbjni</artifactId>
+ <version>${rocksdb.version}</version>
+ </dependency>
<!-- Logging -->
<dependency>
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
index 1500def..c3407bc 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
@@ -18,8 +18,8 @@
package org.apache.kylin.query.enumerator;
+import java.io.IOException;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -27,19 +27,22 @@ import org.apache.calcite.linq4j.Enumerator;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.dict.lookup.ILookupTable;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.tuple.Tuple;
import org.apache.kylin.query.relnode.OLAPContext;
import org.apache.kylin.query.schema.OLAPTable;
import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
*/
public class LookupTableEnumerator implements Enumerator<Object[]> {
+ private final static Logger logger = LoggerFactory.getLogger(LookupTableEnumerator.class);
- private final Collection<String[]> allRows;
+ private final ILookupTable lookupTable;
private final List<ColumnDesc> colDescs;
private final Object[] current;
private Iterator<String[]> iterator;
@@ -67,8 +70,7 @@ public class LookupTableEnumerator implements Enumerator<Object[]> {
throw new IllegalStateException("No dimension with derived columns found for lookup table " + lookupTableName + ", cube desc " + cube.getDescriptor());
CubeManager cubeMgr = CubeManager.getInstance(cube.getConfig());
- LookupStringTable table = cubeMgr.getLookupTable(cube.getLatestReadySegment(), dim.getJoin());
- this.allRows = table.getAllRows();
+ this.lookupTable = cubeMgr.getLookupTable(cube.getLatestReadySegment(), dim.getJoin());
OLAPTable olapTable = (OLAPTable) olapContext.firstTableScan.getOlapTable();
this.colDescs = olapTable.getSourceColumns();
@@ -103,11 +105,16 @@ public class LookupTableEnumerator implements Enumerator<Object[]> {
@Override
public void reset() {
- this.iterator = allRows.iterator();
+ this.iterator = lookupTable.iterator();
}
@Override
public void close() {
+ try {
+ lookupTable.close();
+ } catch (IOException e) {
+ logger.error("error when close lookup table", e);
+ }
}
}
--
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.