You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/04/16 11:43:22 UTC
[1/7] kylin git commit: KYLIN-2506 Refactor Global Dictionary
Repository: kylin
Updated Branches:
refs/heads/master-KYLIN-2506 [created] 06bf2a162
http://git-wip-us.apache.org/repos/asf/kylin/blob/bc6a1c3d/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
deleted file mode 100644
index 3c29d9c..0000000
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.dict;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Test;
-
-/**
- * Created by sunyerui on 16/7/12.
- */
-public class CachedTreeMapTest {
-
- public static class Key implements WritableComparable {
- int keyInt;
-
- public static Key of(int keyInt) {
- Key newKey = new Key();
- newKey.keyInt = keyInt;
- return newKey;
- }
-
- @Override
- public int compareTo(Object o) {
- return keyInt - ((Key)o).keyInt;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(keyInt);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- keyInt = in.readInt();
- }
-
- @Override
- public String toString() {
- return String.valueOf(keyInt);
- }
- }
-
- public static boolean VALUE_WRITE_ERROR_TOGGLE = false;
- public static class Value implements Writable {
- String valueStr;
-
- public static Value of(String valueStr) {
- Value newValue = new Value();
- newValue.valueStr = valueStr;
- return newValue;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- if (VALUE_WRITE_ERROR_TOGGLE) {
- out.write(new byte[0]);
- return;
- }
- out.writeUTF(valueStr);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- valueStr = in.readUTF();
- }
- }
-
- public static class CachedFileFilter implements FileFilter {
- @Override
- public boolean accept(File pathname) {
- return pathname.getName().startsWith(CachedTreeMap.CACHED_PREFIX);
- }
- }
-
- public static class VersionFilter implements FileFilter {
- @Override
- public boolean accept(File pathname) {
- return pathname.getName().startsWith(CachedTreeMap.VERSION_PREFIX);
- }
- }
-
-
- static final UUID uuid = UUID.randomUUID();
- static final String baseDir = "/tmp/kylin_cachedtreemap_test/" + uuid;
- static final String workingDir = baseDir + "/working";
-
- private static void cleanup() {
- Path basePath = new Path(baseDir);
- try {
- HadoopUtil.getFileSystem(basePath).delete(basePath, true);
- } catch (IOException e) {}
- VALUE_WRITE_ERROR_TOGGLE = false;
- }
-
- @After
- public void afterTest() {
- cleanup();
- }
-
- @AfterClass
- public static void tearDown() {
- cleanup();
- }
-
- @Test
- public void testCachedTreeMap() throws IOException {
- CachedTreeMap map = createMutableMap();
- map.put(Key.of(1), Value.of("a"));
- map.put(Key.of(2), Value.of("b"));
- map.put(Key.of(3), Value.of("c"));
- map.put(Key.of(4), Value.of("d"));
- map.put(Key.of(5), Value.of("e"));
-
- File dir = new File(workingDir);
- assertEquals(3, dir.listFiles(new CachedFileFilter()).length);
-
- flushAndCommit(map, true, true, false);
- assertFalse(new File(workingDir).exists());
-
- dir = new File(map.getLatestVersion());
- assertEquals(5, dir.listFiles(new CachedFileFilter()).length);
-
- CachedTreeMap map2 = createImmutableMap();
- assertEquals(5, map2.size());
- assertEquals("b", ((Value)map2.get(Key.of(2))).valueStr);
-
- try {
- map2.put(Key.of(6), Value.of("f"));
- fail("Should be error when put value into immutable map");
- } catch (AssertionError error) {}
- }
-
- @Test
- public void testMultiVersions() throws IOException, InterruptedException {
- CachedTreeMap map = createMutableMap();
- Thread.sleep(3000);
- map.put(Key.of(1), Value.of("a"));
- map.put(Key.of(2), Value.of("b"));
- map.put(Key.of(3), Value.of("c"));
- flushAndCommit(map, true, true, false);
-
- CachedTreeMap map2 = createImmutableMap();
- assertEquals("b", ((Value)map2.get(Key.of(2))).valueStr);
-
- // re-open dict, append new data
- map = createMutableMap();
- map.put(Key.of(4), Value.of("d"));
- flushAndCommit(map, true, true, true);
-
- // new data is not visible for map2
- assertNull(map2.get(Key.of(4)));
-
- // append data, and be visible for new immutable map
- map.put(Key.of(5), Value.of("e"));
- flushAndCommit(map, true, true, true);
-
- CachedTreeMap map3 = createImmutableMap();
- assertEquals("d", ((Value)map3.get(Key.of(4))).valueStr);
- assertEquals("e", ((Value)map3.get(Key.of(5))).valueStr);
-
- // Check versions retention
- File dir = new File(baseDir);
- assertEquals(3, dir.listFiles(new VersionFilter()).length);
- }
-
- @Test
- public void testKeepAppend() throws IOException {
- CachedTreeMap map = createMutableMap();
- map.put(Key.of(1), Value.of("a"));
- map.put(Key.of(2), Value.of("b"));
- map.put(Key.of(3), Value.of("c"));
- map.put(Key.of(4), Value.of("d"));
- map.put(Key.of(5), Value.of("e"));
-
- // flush with keepAppend false, map can't be append
- flushAndCommit(map, true, true, false);
- // append into map has closed
- try {
- map.put(Key.of(6), Value.of("f"));
- fail();
- } catch (AssertionError e) {
- assertEquals("Only support put method with immutable false and keepAppend true", e.getMessage());
- }
-
- CachedTreeMap map2 = createImmutableMap();
- assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
- assertEquals("d", ((Value)map2.get(Key.of(4))).valueStr);
- assertEquals("e", ((Value)map2.get(Key.of(5))).valueStr);
-
- map = createMutableMap();
- map.put(Key.of(6), Value.of("f"));
- map.put(Key.of(7), Value.of("g"));
- map.put(Key.of(8), Value.of("h"));
- // flush with keepAppend true
- flushAndCommit(map, true, true, true);
- map.put(Key.of(9), Value.of("i"));
- // can still append data
- flushAndCommit(map, true, true, false);
-
- map2 = createImmutableMap();
- assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
- assertEquals("d", ((Value)map2.get(Key.of(4))).valueStr);
- assertEquals("f", ((Value)map2.get(Key.of(6))).valueStr);
- assertEquals("i", ((Value)map2.get(Key.of(9))).valueStr);
- }
-
- @Test
- public void testVersionRetention() throws IOException, InterruptedException {
- File dir = new File(baseDir);
- // TTL for 3s and keep 3 versions
- CachedTreeMap map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir)
- .immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class)
- .maxVersions(3).versionTTL(1000 * 3).build();
- map.put(Key.of(1), Value.of("a"));
-
- // has version 0 when create map
- assertEquals(1, dir.listFiles(new VersionFilter()).length);
- Thread.sleep(2500);
-
- // flush version 1
- flushAndCommit(map, true, true, true);
- assertEquals(2, dir.listFiles(new VersionFilter()).length);
-
- // flush version 2
- flushAndCommit(map, true, true, true);
- assertEquals(3, dir.listFiles(new VersionFilter()).length);
-
- // flush version 3
- flushAndCommit(map, true, true, true);
- // won't delete version since 3s TTL
- assertEquals(4, dir.listFiles(new VersionFilter()).length);
-
- // sleep to make version 0 expired
- Thread.sleep(500);
- // flush verion 4
- flushAndCommit(map, true, true, false);
- assertEquals(4, dir.listFiles(new VersionFilter()).length);
-
- // TTL for 100ms and keep 2 versions
- map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir)
- .immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class)
- .maxVersions(2).versionTTL(100).build();
- flushAndCommit(map, true, true, false);
- assertEquals(2, dir.listFiles(new VersionFilter()).length);
- }
-
- @Test
- public void testWithOldFormat() throws IOException {
- File dir = new File(baseDir);
- CachedTreeMap map = createMutableMap();
- map.put(Key.of(1), Value.of("a"));
- map.put(Key.of(2), Value.of("b"));
- map.put(Key.of(3), Value.of("c"));
- map.put(Key.of(4), Value.of("d"));
- map.put(Key.of(5), Value.of("e"));
- flushAndCommit(map, true, true, true);
-
- // move version dir to base dir, to simulate the older format
- Path versionPath = new Path(map.getLatestVersion());
- Path tmpVersionPath = new Path(versionPath.getParent().getParent(), versionPath.getName());
- FileSystem fs = HadoopUtil.getFileSystem(versionPath);
- fs.rename(versionPath, tmpVersionPath);
- fs.delete(new Path(baseDir), true);
- fs.rename(tmpVersionPath, new Path(baseDir));
- assertEquals(0, dir.listFiles(new VersionFilter()).length);
- assertEquals(5, dir.listFiles(new CachedFileFilter()).length);
-
- CachedTreeMap map2 = createImmutableMap();
- assertEquals(5, map2.size());
- assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
- assertEquals("e", ((Value)map2.get(Key.of(5))).valueStr);
-
- assertEquals(1, dir.listFiles(new VersionFilter()).length);
- assertEquals(0, dir.listFiles(new CachedFileFilter()).length);
- }
-
- @Test
- public void testWriteFailed() throws IOException {
- // normal case
- CachedTreeMap map = createMutableMap();
- map.put(Key.of(1), Value.of("a"));
- map.put(Key.of(2), Value.of("b"));
- map.put(Key.of(3), Value.of("c"));
- map.remove(Key.of(3));
- map.put(Key.of(4), Value.of("d"));
-
- flushAndCommit(map, true, true, false);
-
- CachedTreeMap map2 = createImmutableMap();
- assertEquals(3, map2.size());
- assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
-
- // suppose write value failed and didn't commit data
- map = createMutableMap();
- VALUE_WRITE_ERROR_TOGGLE = true;
- map.put(Key.of(1), Value.of("aa"));
- map.put(Key.of(2), Value.of("bb"));
- VALUE_WRITE_ERROR_TOGGLE = false;
- map.put(Key.of(3), Value.of("cc"));
- map.put(Key.of(4), Value.of("dd"));
- // suppose write value failed and didn't commit data
- flushAndCommit(map, true, false, false);
-
- // read map data should not be modified
- map2 = createImmutableMap();
- assertEquals(3, map2.size());
- assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
-
- assertTrue(new File(workingDir).exists());
- }
-
- private CachedTreeMap createImmutableMap() throws IOException {
- CachedTreeMap map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir)
- .immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
- try (DataInputStream in = map.openIndexInput()) {
- map.readFields(in);
- }
- return map;
- }
-
- private CachedTreeMap createMutableMap() throws IOException {
- CachedTreeMap map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir)
- .immutable(false).maxSize(2).maxVersions(3).versionTTL(1000 * 3).keyClazz(Key.class).valueClazz(Value.class).build();
- try (DataInputStream in = map.openIndexInput()) {
- map.readFields(in);
- } catch (IOException e) {}
- return map;
- }
-
- private void flushAndCommit(CachedTreeMap map, boolean doFlush, boolean doCommit, boolean keepAppend) throws IOException {
- if (doFlush) {
- try (DataOutputStream out = map.openIndexOutput()) {
- map.write(out);
- }
- }
-
- if (doCommit) {
- map.commit(keepAppend);
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/kylin/blob/bc6a1c3d/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
index daa1053..08a8cb0 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
@@ -156,9 +156,6 @@ public class KylinKryoRegistrator implements KryoRegistrator {
kyroClasses.add(org.apache.kylin.cube.model.SelectRule.class);
kyroClasses.add(org.apache.kylin.cube.model.v1_4_0.CubeDesc.class);
kyroClasses.add(org.apache.kylin.dict.AppendTrieDictionary.class);
- kyroClasses.add(org.apache.kylin.dict.AppendTrieDictionary.DictNode.class);
- kyroClasses.add(org.apache.kylin.dict.AppendTrieDictionary.DictSlice.class);
- kyroClasses.add(org.apache.kylin.dict.AppendTrieDictionary.DictSliceKey.class);
kyroClasses.add(org.apache.kylin.dict.CacheDictionary.class);
kyroClasses.add(org.apache.kylin.dict.DateStrDictionary.class);
kyroClasses.add(org.apache.kylin.dict.DictionaryInfo.class);
[2/7] kylin git commit: KYLIN-2506 Refactor Global Dictionary
Posted by li...@apache.org.
http://git-wip-us.apache.org/repos/asf/kylin/blob/bc6a1c3d/core-dictionary/src/main/java/org/apache/kylin/dict/DictNode.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictNode.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictNode.java
new file mode 100644
index 0000000..dd9593a
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictNode.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+
+public class DictNode {
+ public byte[] part;
+ public int id = -1;
+ public boolean isEndOfValue;
+ public ArrayList<DictNode> children = new ArrayList<>();
+
+ public int nValuesBeneath;
+ public DictNode parent;
+ public int childrenCount = 1;
+
+ DictNode(byte[] value, boolean isEndOfValue) {
+ reset(value, isEndOfValue);
+ }
+
+ DictNode(byte[] value, boolean isEndOfValue, ArrayList<DictNode> children) {
+ reset(value, isEndOfValue, children);
+ }
+
+ void reset(byte[] value, boolean isEndOfValue) {
+ reset(value, isEndOfValue, new ArrayList<DictNode>());
+ }
+
+ void reset(byte[] value, boolean isEndOfValue, ArrayList<DictNode> children) {
+ this.part = value;
+ this.isEndOfValue = isEndOfValue;
+ clearChild();
+ for (DictNode child : children) {
+ addChild(child);
+ }
+ this.id = -1;
+ }
+
+ void clearChild() {
+ this.children.clear();
+ int childrenCountDelta = this.childrenCount - 1;
+ for (DictNode p = this; p != null; p = p.parent) {
+ p.childrenCount -= childrenCountDelta;
+ }
+ }
+
+ void addChild(DictNode child) {
+ addChild(-1, child);
+ }
+
+ void addChild(int index, DictNode child) {
+ child.parent = this;
+ if (index < 0) {
+ this.children.add(child);
+ } else {
+ this.children.add(index, child);
+ }
+ for (DictNode p = this; p != null; p = p.parent) {
+ p.childrenCount += child.childrenCount;
+ }
+ }
+
+ private DictNode removeChild(int index) {
+ DictNode child = children.remove(index);
+ child.parent = null;
+ for (DictNode p = this; p != null; p = p.parent) {
+ p.childrenCount -= child.childrenCount;
+ }
+ return child;
+ }
+
+ private DictNode duplicateNode() {
+ DictNode newChild = new DictNode(part, false);
+ newChild.parent = parent;
+ if (parent != null) {
+ int index = parent.children.indexOf(this);
+ parent.addChild(index + 1, newChild);
+ }
+ return newChild;
+ }
+
+ public byte[] firstValue() {
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ DictNode p = this;
+ while (true) {
+ bytes.write(p.part, 0, p.part.length);
+ if (p.isEndOfValue || p.children.size() == 0) {
+ break;
+ }
+ p = p.children.get(0);
+ }
+ return bytes.toByteArray();
+ }
+
+ public static DictNode splitNodeTree(final DictNode splitNode) {
+ if (splitNode == null) {
+ return null;
+ }
+ DictNode current = splitNode;
+ DictNode p = current.parent;
+ while (p != null) {
+ int index = p.children.indexOf(current);
+ assert index != -1;
+ DictNode newParent = p.duplicateNode();
+ for (int i = p.children.size() - 1; i >= index; i--) {
+ DictNode child = p.removeChild(i);
+ newParent.addChild(0, child);
+ }
+ current = newParent;
+ p = p.parent;
+ }
+ return current;
+ }
+
+ public byte[] buildTrieBytes() {
+ Stats stats = Stats.stats(this);
+ int sizeChildOffset = stats.mbpn_sizeChildOffset;
+ int sizeId = stats.mbpn_sizeId;
+
+ // write head
+ byte[] head;
+ try {
+ ByteArrayOutputStream byteBuf = new ByteArrayOutputStream();
+ DataOutputStream headOut = new DataOutputStream(byteBuf);
+ headOut.write(AppendTrieDictionary.HEAD_MAGIC);
+ headOut.writeShort(0); // head size, will back fill
+ headOut.writeInt(stats.mbpn_footprint); // body size
+ headOut.writeInt(stats.nValues);
+ headOut.write(sizeChildOffset);
+ headOut.write(sizeId);
+ headOut.close();
+ head = byteBuf.toByteArray();
+ BytesUtil.writeUnsigned(head.length, head, AppendTrieDictionary.HEAD_SIZE_I, 2);
+ } catch (IOException e) {
+ throw new RuntimeException(e); // shall not happen, as we are
+ }
+
+ byte[] trieBytes = new byte[stats.mbpn_footprint + head.length];
+ System.arraycopy(head, 0, trieBytes, 0, head.length);
+
+ LinkedList<DictNode> open = new LinkedList<DictNode>();
+ IdentityHashMap<DictNode, Integer> offsetMap = new IdentityHashMap<DictNode, Integer>();
+
+ // write body
+ int o = head.length;
+ offsetMap.put(this, o);
+ o = build_writeNode(this, o, true, sizeChildOffset, sizeId, trieBytes);
+ if (this.children.isEmpty() == false)
+ open.addLast(this);
+
+ while (open.isEmpty() == false) {
+ DictNode parent = open.removeFirst();
+ build_overwriteChildOffset(offsetMap.get(parent), o - head.length, sizeChildOffset, trieBytes);
+ for (int i = 0; i < parent.children.size(); i++) {
+ DictNode c = parent.children.get(i);
+ boolean isLastChild = (i == parent.children.size() - 1);
+ offsetMap.put(c, o);
+ o = build_writeNode(c, o, isLastChild, sizeChildOffset, sizeId, trieBytes);
+ if (c.children.isEmpty() == false)
+ open.addLast(c);
+ }
+ }
+
+ if (o != trieBytes.length)
+ throw new RuntimeException();
+ return trieBytes;
+ }
+
+ private void build_overwriteChildOffset(int parentOffset, int childOffset, int sizeChildOffset, byte[] trieBytes) {
+ int flags = (int) trieBytes[parentOffset] & (TrieDictionary.BIT_IS_LAST_CHILD | TrieDictionary.BIT_IS_END_OF_VALUE);
+ BytesUtil.writeUnsigned(childOffset, trieBytes, parentOffset, sizeChildOffset);
+ trieBytes[parentOffset] |= flags;
+ }
+
+ private int build_writeNode(DictNode n, int offset, boolean isLastChild, int sizeChildOffset, int sizeId, byte[] trieBytes) {
+ int o = offset;
+
+ // childOffset
+ if (isLastChild)
+ trieBytes[o] |= TrieDictionary.BIT_IS_LAST_CHILD;
+ if (n.isEndOfValue)
+ trieBytes[o] |= TrieDictionary.BIT_IS_END_OF_VALUE;
+ o += sizeChildOffset;
+
+ // nValueBytes
+ if (n.part.length > 255)
+ throw new RuntimeException("Value length is " + n.part.length + " and larger than 255: " + Bytes.toStringBinary(n.part));
+ BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1);
+ o++;
+
+ // valueBytes
+ System.arraycopy(n.part, 0, trieBytes, o, n.part.length);
+ o += n.part.length;
+
+ if (n.isEndOfValue) {
+ checkValidId(n.id);
+ BytesUtil.writeUnsigned(n.id, trieBytes, o, sizeId);
+ o += sizeId;
+ }
+
+ return o;
+ }
+
+ // The dict id starts from 1 to 2147483647 and 2147483648 to -2, leave 0 and -1 used for uninitialized state
+ private void checkValidId(int id) {
+ if (id == 0 || id == -1) {
+ throw new IllegalArgumentException("AppendTrieDictionary Id Overflow Unsigned Integer Size 4294967294");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("DictNode[root=%s, nodes=%d, firstValue=%s]", Bytes.toStringBinary(part), childrenCount, Bytes.toStringBinary(firstValue()));
+ }
+
+ static class Stats {
+ public interface Visitor {
+ void visit(DictNode n, int level);
+ }
+
+ private static void traverseR(DictNode node, Visitor visitor, int level) {
+ visitor.visit(node, level);
+ for (DictNode c : node.children)
+ traverseR(c, visitor, level + 1);
+ }
+
+ private static void traversePostOrderR(DictNode node, Visitor visitor, int level) {
+ for (DictNode c : node.children)
+ traversePostOrderR(c, visitor, level + 1);
+ visitor.visit(node, level);
+ }
+
+ public int nValues; // number of values in total
+ public int nValueBytesPlain; // number of bytes for all values
+ // uncompressed
+ public int nValueBytesCompressed; // number of values bytes in Trie
+ // (compressed)
+ public int maxValueLength; // size of longest value in bytes
+
+ // the trie is multi-byte-per-node
+ public int mbpn_nNodes; // number of nodes in trie
+ public int mbpn_trieDepth; // depth of trie
+ public int mbpn_maxFanOut; // the maximum no. children
+ public int mbpn_nChildLookups; // number of child lookups during lookup
+ // every value once
+ public int mbpn_nTotalFanOut; // the sum of fan outs during lookup every
+ // value once
+ public int mbpn_sizeValueTotal; // the sum of value space in all nodes
+ public int mbpn_sizeNoValueBytes; // size of field noValueBytes
+ public int mbpn_sizeChildOffset; // size of field childOffset, points to
+ // first child in flattened array
+ public int mbpn_sizeId; // size of id value, always be 4
+ public int mbpn_footprint; // MBPN footprint in bytes
+
+ /**
+ * out print some statistics of the trie and the dictionary built from it
+ */
+ public static Stats stats(DictNode root) {
+ // calculate nEndValueBeneath
+ traversePostOrderR(root, new Visitor() {
+ @Override
+ public void visit(DictNode n, int level) {
+ n.nValuesBeneath = n.isEndOfValue ? 1 : 0;
+ for (DictNode c : n.children)
+ n.nValuesBeneath += c.nValuesBeneath;
+ }
+ }, 0);
+
+ // run stats
+ final Stats s = new Stats();
+ final ArrayList<Integer> lenAtLvl = new ArrayList<Integer>();
+ traverseR(root, new Visitor() {
+ @Override
+ public void visit(DictNode n, int level) {
+ if (n.isEndOfValue)
+ s.nValues++;
+ s.nValueBytesPlain += n.part.length * n.nValuesBeneath;
+ s.nValueBytesCompressed += n.part.length;
+ s.mbpn_nNodes++;
+ if (s.mbpn_trieDepth < level + 1)
+ s.mbpn_trieDepth = level + 1;
+ if (n.children.size() > 0) {
+ if (s.mbpn_maxFanOut < n.children.size())
+ s.mbpn_maxFanOut = n.children.size();
+ int childLookups = n.nValuesBeneath - (n.isEndOfValue ? 1 : 0);
+ s.mbpn_nChildLookups += childLookups;
+ s.mbpn_nTotalFanOut += childLookups * n.children.size();
+ }
+
+ if (level < lenAtLvl.size())
+ lenAtLvl.set(level, n.part.length);
+ else
+ lenAtLvl.add(n.part.length);
+ int lenSoFar = 0;
+ for (int i = 0; i <= level; i++)
+ lenSoFar += lenAtLvl.get(i);
+ if (lenSoFar > s.maxValueLength)
+ s.maxValueLength = lenSoFar;
+ }
+ }, 0);
+
+ // flatten trie footprint calculation, case of Multi-Byte-Per-DictNode
+ s.mbpn_sizeId = 4;
+ s.mbpn_sizeValueTotal = s.nValueBytesCompressed + s.nValues * s.mbpn_sizeId;
+ s.mbpn_sizeNoValueBytes = 1;
+ s.mbpn_sizeChildOffset = 5;
+ s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset);
+ while (true) { // minimize the offset size to match the footprint
+ int t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset - 1);
+ // *4 because 2 MSB of offset is used for isEndOfValue & isEndChild flag
+ // expand t to long before *4, avoiding exceed Integer.MAX_VALUE
+ if (BytesUtil.sizeForValue((long) t * 4) <= s.mbpn_sizeChildOffset - 1) {
+ s.mbpn_sizeChildOffset--;
+ s.mbpn_footprint = t;
+ } else
+ break;
+ }
+
+ return s;
+ }
+
+ /**
+ * out print trie for debug
+ */
+ public void print(DictNode root) {
+ print(root, System.out);
+ }
+
+ public void print(DictNode root, final PrintStream out) {
+ traverseR(root, new Visitor() {
+ @Override
+ public void visit(DictNode n, int level) {
+ try {
+ for (int i = 0; i < level; i++)
+ out.print(" ");
+ out.print(new String(n.part, "UTF-8"));
+ out.print(" - ");
+ if (n.nValuesBeneath > 0)
+ out.print(n.nValuesBeneath);
+ if (n.isEndOfValue)
+ out.print("* [" + n.id + "]");
+ out.print("\n");
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ }
+ }
+ }, 0);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/bc6a1c3d/core-dictionary/src/main/java/org/apache/kylin/dict/DictSlice.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictSlice.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictSlice.java
new file mode 100644
index 0000000..3ca0d8f
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictSlice.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+
+public class DictSlice {
+ static final byte[] HEAD_MAGIC = new byte[] { 0x41, 0x70, 0x70, 0x65, 0x63, 0x64, 0x54, 0x72, 0x69, 0x65, 0x44, 0x69, 0x63, 0x74 }; // "AppendTrieDict"
+ static final int HEAD_SIZE_I = HEAD_MAGIC.length;
+ static final int BIT_IS_LAST_CHILD = 0x80;
+ static final int BIT_IS_END_OF_VALUE = 0x40;
+
+ private byte[] trieBytes;
+
+ // non-persistent part
+ transient private int headSize;
+ transient private int bodyLen;
+ transient private int sizeChildOffset;
+
+ transient private int nValues;
+ transient private int sizeOfId;
+ // mask MUST be long, since childOffset maybe 5 bytes at most
+ transient private long childOffsetMask;
+ transient private int firstByteOffset;
+
+ public DictSlice(byte[] bytes) {
+ this.trieBytes = bytes;
+ init();
+ }
+
+ private void init() {
+ if (BytesUtil.compareBytes(HEAD_MAGIC, 0, trieBytes, 0, HEAD_MAGIC.length) != 0)
+ throw new IllegalArgumentException("Wrong file type (magic does not match)");
+
+ try {
+ DataInputStream headIn = new DataInputStream(new ByteArrayInputStream(trieBytes, HEAD_SIZE_I, trieBytes.length - HEAD_SIZE_I));
+ this.headSize = headIn.readShort();
+ this.bodyLen = headIn.readInt();
+ this.nValues = headIn.readInt();
+ this.sizeChildOffset = headIn.read();
+ this.sizeOfId = headIn.read();
+
+ this.childOffsetMask = ~(((long) (BIT_IS_LAST_CHILD | BIT_IS_END_OF_VALUE)) << ((sizeChildOffset - 1) * 8));
+ this.firstByteOffset = sizeChildOffset + 1; // the offset from begin of node to its first value byte
+ } catch (Exception e) {
+ if (e instanceof RuntimeException)
+ throw (RuntimeException) e;
+ else
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static DictSlice deserializeFrom(DataInput in) throws IOException {
+ byte[] headPartial = new byte[HEAD_MAGIC.length + Short.SIZE / Byte.SIZE + Integer.SIZE / Byte.SIZE];
+ in.readFully(headPartial);
+
+ if (BytesUtil.compareBytes(HEAD_MAGIC, 0, headPartial, 0, HEAD_MAGIC.length) != 0)
+ throw new IllegalArgumentException("Wrong file type (magic does not match)");
+
+ DataInputStream headIn = new DataInputStream(//
+ new ByteArrayInputStream(headPartial, HEAD_SIZE_I, headPartial.length - HEAD_SIZE_I));
+ int headSize = headIn.readShort();
+ int bodyLen = headIn.readInt();
+ headIn.close();
+
+ byte[] all = new byte[headSize + bodyLen];
+ System.arraycopy(headPartial, 0, all, 0, headPartial.length);
+ in.readFully(all, headPartial.length, all.length - headPartial.length);
+
+ return new DictSlice(all);
+ }
+
+ public byte[] getFirstValue() {
+ int nodeOffset = headSize;
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ while (true) {
+ int valueLen = BytesUtil.readUnsigned(trieBytes, nodeOffset + firstByteOffset - 1, 1);
+ bytes.write(trieBytes, nodeOffset + firstByteOffset, valueLen);
+ if (checkFlag(nodeOffset, BIT_IS_END_OF_VALUE)) {
+ break;
+ }
+ nodeOffset = headSize + (int) (BytesUtil.readLong(trieBytes, nodeOffset, sizeChildOffset) & childOffsetMask);
+ if (nodeOffset == headSize) {
+ break;
+ }
+ }
+ return bytes.toByteArray();
+ }
+
+ /**
+ * returns a code point from [0, nValues), preserving order of value
+ *
+ * @param n -- the offset of current node
+ * @param inp -- input value bytes to lookup
+ * @param o -- offset in the input value bytes matched so far
+ * @param inpEnd -- end of input
+ * @param roundingFlag -- =0: return -1 if not found
+ * -- <0: return closest smaller if not found, return -1
+ * -- >0: return closest bigger if not found, return nValues
+ */
+ private int lookupSeqNoFromValue(int n, byte[] inp, int o, int inpEnd, int roundingFlag) {
+ while (true) {
+ // match the current node
+ int p = n + firstByteOffset; // start of node's value
+ int end = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1); // end of node's value
+ for (; p < end && o < inpEnd; p++, o++) { // note matching start from [0]
+ if (trieBytes[p] != inp[o]) {
+ return -1; // mismatch
+ }
+ }
+
+ // node completely matched, is input all consumed?
+ boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE);
+ if (o == inpEnd) {
+ return p == end && isEndOfValue ? BytesUtil.readUnsigned(trieBytes, end, sizeOfId) : -1;
+ }
+
+ // find a child to continue
+ int c = headSize + (int) (BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask);
+ if (c == headSize) // has no children
+ return -1;
+ byte inpByte = inp[o];
+ int comp;
+ while (true) {
+ p = c + firstByteOffset;
+ comp = BytesUtil.compareByteUnsigned(trieBytes[p], inpByte);
+ if (comp == 0) { // continue in the matching child, reset n and loop again
+ n = c;
+ break;
+ } else if (comp < 0) { // try next child
+ if (checkFlag(c, BIT_IS_LAST_CHILD))
+ return -1;
+ c = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1) + (checkFlag(c, BIT_IS_END_OF_VALUE) ? sizeOfId : 0);
+ } else { // children are ordered by their first value byte
+ return -1;
+ }
+ }
+ }
+ }
+
+ private boolean checkFlag(int offset, int bit) {
+ return (trieBytes[offset] & bit) > 0;
+ }
+
+ public int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag) {
+ int id = lookupSeqNoFromValue(headSize, value, offset, offset + len, roundingFlag);
+ return id;
+ }
+
+ public DictNode rebuildTrieTree() {
+ return rebuildTrieTreeR(headSize, null);
+ }
+
+ private DictNode rebuildTrieTreeR(int n, DictNode parent) {
+ DictNode root = null;
+ while (true) {
+ int p = n + firstByteOffset;
+ int childOffset = (int) (BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask);
+ int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1);
+ boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE);
+
+ byte[] value = new byte[parLen];
+ System.arraycopy(trieBytes, p, value, 0, parLen);
+
+ DictNode node = new DictNode(value, isEndOfValue);
+ if (isEndOfValue) {
+ int id = BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId);
+ node.id = id;
+ }
+
+ if (parent == null) {
+ root = node;
+ } else {
+ parent.addChild(node);
+ }
+
+ if (childOffset != 0) {
+ rebuildTrieTreeR(childOffset + headSize, node);
+ }
+
+ if (checkFlag(n, BIT_IS_LAST_CHILD)) {
+ break;
+ } else {
+ n += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0);
+ }
+ }
+ return root;
+ }
+
+ public boolean doCheck() {
+ int offset = headSize;
+ HashSet<Integer> parentSet = new HashSet<>();
+ boolean lastChild = false;
+
+ while (offset < trieBytes.length) {
+ if (lastChild) {
+ boolean contained = parentSet.remove(offset - headSize);
+ // Can't find parent, the data is corrupted
+ if (!contained) {
+ return false;
+ }
+ lastChild = false;
+ }
+ int p = offset + firstByteOffset;
+ int childOffset = (int) (BytesUtil.readLong(trieBytes, offset, sizeChildOffset) & childOffsetMask);
+ int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1);
+ boolean isEndOfValue = checkFlag(offset, BIT_IS_END_OF_VALUE);
+
+ // Copy value overflow, the data is corrupted
+ if (trieBytes.length < p + parLen) {
+ return false;
+ }
+
+ // Check id is fine
+ if (isEndOfValue) {
+ BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId);
+ }
+
+ // Record it if has children
+ if (childOffset != 0) {
+ parentSet.add(childOffset);
+ }
+
+ // brothers done, move to next parent
+ if (checkFlag(offset, BIT_IS_LAST_CHILD)) {
+ lastChild = true;
+ }
+
+ // move to next node
+ offset += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0);
+ }
+
+ // ParentMap is empty, meaning all nodes has parent, the data is correct
+ return parentSet.isEmpty();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("DictSlice[firstValue=%s, values=%d, bytes=%d]", Bytes.toStringBinary(getFirstValue()), nValues, bodyLen);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(trieBytes);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DictSlice)) {
+ return false;
+ }
+ DictSlice that = (DictSlice) o;
+ return Arrays.equals(this.trieBytes, that.trieBytes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/bc6a1c3d/core-dictionary/src/main/java/org/apache/kylin/dict/DictSliceKey.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictSliceKey.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictSliceKey.java
new file mode 100644
index 0000000..8fc3f78
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictSliceKey.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.Bytes;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+public class DictSliceKey implements Comparable<DictSliceKey> {
+ static final DictSliceKey START_KEY = DictSliceKey.wrap(new byte[0]);
+
+ byte[] key;
+
+ public static DictSliceKey wrap(byte[] key) {
+ DictSliceKey dictKey = new DictSliceKey();
+ dictKey.key = key;
+ return dictKey;
+ }
+
+ @Override
+ public String toString() {
+ return Bytes.toStringBinary(key);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(key);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o instanceof DictSliceKey) {
+ DictSliceKey that = (DictSliceKey) o;
+ return Arrays.equals(this.key, that.key);
+ }
+ return false;
+ }
+
+ @Override
+ public int compareTo(DictSliceKey that) {
+ return Bytes.compareTo(key, that.key);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(key.length);
+ out.write(key);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ key = new byte[in.readInt()];
+ in.readFully(key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/bc6a1c3d/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java
new file mode 100644
index 0000000..d9030d3
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+public class GlobalDictHDFSStore extends GlobalDictStore {
+
+ static final Logger logger = LoggerFactory.getLogger(GlobalDictHDFSStore.class);
+ static final String V1_INDEX_NAME = ".index";
+ static final String V2_INDEX_NAME = ".index_v2";
+ static final String VERSION_PREFIX = "version_";
+ static final int BUFFER_SIZE = 8 * 1024 * 1024;
+
+ private final Path basePath;
+ private final Configuration conf;
+ private final FileSystem fileSystem;
+
+ protected GlobalDictHDFSStore(String baseDir) throws IOException {
+ super(baseDir);
+ this.basePath = new Path(baseDir);
+ this.conf = HadoopUtil.getCurrentConfiguration();
+ this.fileSystem = HadoopUtil.getFileSystem(baseDir);
+
+ if (!fileSystem.exists(basePath)) {
+ logger.info("Global dict at {} doesn't exist, create a new one", basePath);
+ fileSystem.mkdirs(basePath);
+ }
+
+ migrateOldLayout();
+ }
+
+ // Previously we put slice files and index file directly in base directory,
+ // should migrate to the new versioned layout
+ private void migrateOldLayout() throws IOException {
+ FileStatus[] sliceFiles = fileSystem.listStatus(basePath, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith(IndexFormatV1.SLICE_PREFIX);
+ }
+ });
+ Path indexFile = new Path(basePath, V1_INDEX_NAME);
+
+ if (fileSystem.exists(indexFile) && sliceFiles.length > 0) { // old layout
+ final long version = System.currentTimeMillis();
+ Path tempDir = new Path(basePath, "tmp_" + VERSION_PREFIX + version);
+ Path versionDir = getVersionDir(version);
+
+ logger.info("Convert global dict at {} to new layout with version {}", basePath, version);
+
+ fileSystem.mkdirs(tempDir);
+ // convert to new layout
+ try {
+ // copy index and slice files to temp
+ FileUtil.copy(fileSystem, indexFile, fileSystem, tempDir, false, conf);
+ for (FileStatus sliceFile : sliceFiles) {
+ FileUtil.copy(fileSystem, sliceFile.getPath(), fileSystem, tempDir, false, conf);
+ }
+ // rename
+ fileSystem.rename(tempDir, versionDir);
+ // delete index and slices files in base dir
+ fileSystem.delete(indexFile, false);
+ for (FileStatus sliceFile : sliceFiles) {
+ fileSystem.delete(sliceFile.getPath(), true);
+ }
+
+ } finally {
+ if (fileSystem.exists(tempDir)) {
+ fileSystem.delete(tempDir, true);
+ }
+ }
+ }
+ }
+
+ @Override
+ void prepareForWrite(String workingDir) throws IOException {
+ // TODO create lock file
+ Path working = new Path(workingDir);
+
+ if (fileSystem.exists(working)) {
+ fileSystem.delete(working, true);
+ logger.info("Working directory {} exits, delete it first", working);
+ }
+
+ // when build dict, copy all data into working dir and work on it, avoiding suddenly server crash made data corrupt
+ Long[] versions = listAllVersions();
+ if (versions.length > 0) {
+ Path latestVersion = getVersionDir(versions[versions.length - 1]);
+ FileUtil.copy(fileSystem, latestVersion, fileSystem, working, false, true, conf);
+ } else {
+ fileSystem.mkdirs(working);
+ }
+ }
+
+ @Override
+ Long[] listAllVersions() throws IOException {
+ FileStatus[] versionDirs = fileSystem.listStatus(basePath, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith(VERSION_PREFIX);
+ }
+ });
+ TreeSet<Long> versions = new TreeSet<>();
+ for (int i = 0; i < versionDirs.length; i++) {
+ Path path = versionDirs[i].getPath();
+ versions.add(Long.parseLong(path.getName().substring(VERSION_PREFIX.length())));
+ }
+ return versions.toArray(new Long[versions.size()]);
+ }
+
+ @Override
+ public Path getVersionDir(long version) {
+ return new Path(basePath, VERSION_PREFIX + version);
+ }
+
+ @Override
+ GlobalDictMetadata getMetadata(long version) throws IOException {
+ Path versionDir = getVersionDir(version);
+ FileStatus[] indexFiles = fileSystem.listStatus(versionDir, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith(V1_INDEX_NAME);
+ }
+ });
+ checkState(indexFiles.length == 1, "zero or more than one index file found: %s", Arrays.toString(indexFiles));
+
+ IndexFormat format;
+ String indexFile = indexFiles[0].getPath().getName();
+ if (V2_INDEX_NAME.equals(indexFile)) {
+ format = new IndexFormatV2(fileSystem, conf);
+ } else if (V1_INDEX_NAME.equals(indexFile)) {
+ format = new IndexFormatV1(fileSystem, conf);
+ } else {
+ throw new RuntimeException("Unknown index file: " + indexFile);
+ }
+
+ return format.readIndexFile(versionDir);
+ }
+
+ @Override
+ DictSlice readSlice(String directory, String sliceFileName) {
+ Path path = new Path(directory, sliceFileName);
+ logger.info("read slice from {}", path);
+ try (FSDataInputStream input = fileSystem.open(path, BUFFER_SIZE)) {
+ return DictSlice.deserializeFrom(input);
+ } catch (IOException e) {
+ throw new RuntimeException(String.format("read slice %s failed", path), e);
+ }
+ }
+
+ @Override
+ String writeSlice(String workingDir, DictSliceKey key, DictNode slice) {
+ //write new slice
+ String sliceFile = IndexFormatV2.sliceFileName(key);
+ Path path = new Path(workingDir, sliceFile);
+
+ logger.info("write slice with key {} into file {}", key, path);
+ try (FSDataOutputStream out = fileSystem.create(path, true, BUFFER_SIZE)) {
+ byte[] bytes = slice.buildTrieBytes();
+ out.write(bytes);
+ } catch (IOException e) {
+ throw new RuntimeException(String.format("write slice with key %s into file %s failed", key, path), e);
+ }
+ return sliceFile;
+ }
+
+ @Override
+ void deleteSlice(String workingDir, String sliceFileName) {
+ Path path = new Path(workingDir, sliceFileName);
+ logger.info("delete slice at {}", path);
+ try {
+ if (fileSystem.exists(path)) {
+ fileSystem.delete(path, false);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(String.format("delete slice at %s failed", path), e);
+ }
+ }
+
+ @Override
+ void commit(String workingDir, GlobalDictMetadata metadata) throws IOException {
+ Path workingPath = new Path(workingDir);
+
+ // delete v1 index file
+ Path oldIndexFile = new Path(workingPath, V1_INDEX_NAME);
+ if (fileSystem.exists(oldIndexFile)) {
+ fileSystem.delete(oldIndexFile, false);
+ }
+ // write v2 index file
+ IndexFormat index = new IndexFormatV2(fileSystem, conf);
+ index.writeIndexFile(workingPath, metadata);
+ index.sanityCheck(workingPath, metadata);
+
+ // copy working dir to newVersion dir
+ Path newVersionPath = new Path(basePath, VERSION_PREFIX + System.currentTimeMillis());
+ fileSystem.rename(workingPath, newVersionPath);
+
+ cleanUp();
+ }
+
+ // Check versions count, delete expired versions
+ private void cleanUp() throws IOException {
+ Long[] versions = listAllVersions();
+ long timestamp = System.currentTimeMillis();
+ for (int i = 0; i < versions.length - maxVersions; i++) {
+ if (versions[i] + versionTTL < timestamp) {
+ fileSystem.delete(getVersionDir(versions[i]), true);
+ }
+ }
+ }
+
+ @Override
+ String copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException {
+ checkArgument(baseDir.startsWith(srcConfig.getHdfsWorkingDirectory()), "Please check why current directory {} doesn't belong to source working directory {}", baseDir, srcConfig.getHdfsWorkingDirectory());
+
+ final String dstBaseDir = baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory());
+
+ Long[] versions = listAllVersions();
+ if (versions.length == 0) { // empty dict, nothing to copy
+ return dstBaseDir;
+ }
+
+ Path srcVersionDir = getVersionDir(versions[versions.length - 1]);
+ Path dstVersionDir = new Path(srcVersionDir.toString().replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()));
+ FileSystem dstFS = dstVersionDir.getFileSystem(conf);
+ if (dstFS.exists(dstVersionDir)) {
+ dstFS.delete(dstVersionDir, true);
+ }
+ FileUtil.copy(fileSystem, srcVersionDir, dstFS, dstVersionDir, false, true, conf);
+
+ return dstBaseDir;
+ }
+
+ public interface IndexFormat {
+ GlobalDictMetadata readIndexFile(Path dir) throws IOException;
+
+ void writeIndexFile(Path dir, GlobalDictMetadata metadata) throws IOException;
+
+ void sanityCheck(Path dir, GlobalDictMetadata metadata) throws IOException;
+ }
+
+ public static class IndexFormatV1 implements IndexFormat {
+ static final String SLICE_PREFIX = "cached_";
+
+ protected final FileSystem fs;
+ protected final Configuration conf;
+
+ protected IndexFormatV1(FileSystem fs, Configuration conf) {
+ this.fs = fs;
+ this.conf = conf;
+ }
+
+ @Override
+ public GlobalDictMetadata readIndexFile(Path dir) throws IOException {
+ Path indexFile = new Path(dir, V1_INDEX_NAME);
+ try (FSDataInputStream in = fs.open(indexFile)) {
+ int baseId = in.readInt();
+ int maxId = in.readInt();
+ int maxValueLength = in.readInt();
+ int nValues = in.readInt();
+ String converterName = in.readUTF();
+ BytesConverter converter;
+ try {
+ converter = ClassUtil.forName(converterName, BytesConverter.class).newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to instantiate BytesConverter: " + converterName, e);
+ }
+
+ int nSlices = in.readInt();
+ TreeMap<DictSliceKey, String> sliceFileMap = new TreeMap<>();
+ for (int i = 0; i < nSlices; i++) {
+ DictSliceKey key = new DictSliceKey();
+ key.readFields(in);
+ sliceFileMap.put(key, sliceFileName(key));
+ }
+ // make sure first key is always ""
+ String firstFile = sliceFileMap.remove(sliceFileMap.firstKey());
+ sliceFileMap.put(DictSliceKey.START_KEY, firstFile);
+
+ return new GlobalDictMetadata(baseId, maxId, maxValueLength, nValues, converter, sliceFileMap);
+ }
+ }
+
+ //only for test
+ @Override
+ public void writeIndexFile(Path dir, GlobalDictMetadata metadata) throws IOException {
+ Path indexFile = new Path(dir, V1_INDEX_NAME);
+ try (FSDataOutputStream out = fs.create(indexFile, true)) {
+ out.writeInt(metadata.baseId);
+ out.writeInt(metadata.maxId);
+ out.writeInt(metadata.maxValueLength);
+ out.writeInt(metadata.nValues);
+ out.writeUTF(metadata.bytesConverter.getClass().getName());
+ out.writeInt(metadata.sliceFileMap.size());
+ for (Map.Entry<DictSliceKey, String> entry : metadata.sliceFileMap.entrySet()) {
+ entry.getKey().write(out);
+ }
+ }
+ }
+
+ @Override
+ public void sanityCheck(Path dir, GlobalDictMetadata metadata) throws IOException {
+ throw new UnsupportedOperationException("sanityCheck V1 format is no longer supported");
+ }
+
+ public static String sliceFileName(DictSliceKey key) {
+ return SLICE_PREFIX + key;
+ }
+ }
+
+ public static class IndexFormatV2 extends IndexFormatV1 {
+ static final String SLICE_PREFIX = "cached_";
+ static final int MINOR_VERSION_V1 = 0x01;
+
+ protected IndexFormatV2(FileSystem fs, Configuration conf) {
+ super(fs, conf);
+ }
+
+ @Override
+ public GlobalDictMetadata readIndexFile(Path dir) throws IOException {
+ Path indexFile = new Path(dir, V2_INDEX_NAME);
+ try (FSDataInputStream in = fs.open(indexFile)) {
+ byte minorVersion = in.readByte(); // include a header to allow minor format changes
+ if (minorVersion != MINOR_VERSION_V1) {
+ throw new RuntimeException("Unsupported minor version " + minorVersion);
+ }
+ int baseId = in.readInt();
+ int maxId = in.readInt();
+ int maxValueLength = in.readInt();
+ int nValues = in.readInt();
+ String converterName = in.readUTF();
+ BytesConverter converter;
+ try {
+ converter = ClassUtil.forName(converterName, BytesConverter.class).newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to instantiate BytesConverter: " + converterName, e);
+ }
+
+ int nSlices = in.readInt();
+ TreeMap<DictSliceKey, String> sliceFileMap = new TreeMap<>();
+ for (int i = 0; i < nSlices; i++) {
+ DictSliceKey key = new DictSliceKey();
+ key.readFields(in);
+ String sliceFileName = in.readUTF();
+ sliceFileMap.put(key, sliceFileName);
+ }
+
+ return new GlobalDictMetadata(baseId, maxId, maxValueLength, nValues, converter, sliceFileMap);
+ }
+ }
+
+ @Override
+ public void writeIndexFile(Path dir, GlobalDictMetadata metadata) throws IOException {
+ Path indexFile = new Path(dir, V2_INDEX_NAME);
+ try (FSDataOutputStream out = fs.create(indexFile, true)) {
+ out.writeByte(MINOR_VERSION_V1);
+ out.writeInt(metadata.baseId);
+ out.writeInt(metadata.maxId);
+ out.writeInt(metadata.maxValueLength);
+ out.writeInt(metadata.nValues);
+ out.writeUTF(metadata.bytesConverter.getClass().getName());
+ out.writeInt(metadata.sliceFileMap.size());
+ for (Map.Entry<DictSliceKey, String> entry : metadata.sliceFileMap.entrySet()) {
+ entry.getKey().write(out);
+ out.writeUTF(entry.getValue());
+ }
+ }
+ }
+
+ @Override
+ public void sanityCheck(Path dir, GlobalDictMetadata metadata) throws IOException {
+ for (Map.Entry<DictSliceKey, String> entry : metadata.sliceFileMap.entrySet()) {
+ if (!fs.exists(new Path(dir, entry.getValue()))) {
+ throw new RuntimeException("The slice file " + entry.getValue() + " for the key: " + entry.getKey() + " must be existed!");
+ }
+ }
+ }
+
+ public static String sliceFileName(DictSliceKey key) {
+ return String.format("%s%d_%d", SLICE_PREFIX, System.currentTimeMillis(), key.hashCode());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/bc6a1c3d/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictMetadata.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictMetadata.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictMetadata.java
new file mode 100644
index 0000000..65c80ca
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictMetadata.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import com.google.common.base.Preconditions;
+
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * Encapsulates the metadata for a particular version of the global dictionary.
+ * Usually each version of a global dictionary stores its metadata in an index file.
+ */
+public class GlobalDictMetadata {
+ final int baseId;
+ final int maxId;
+ final int maxValueLength;
+ final int nValues;
+ final BytesConverter bytesConverter;
+ final TreeMap<DictSliceKey, String> sliceFileMap; // slice key -> slice file name
+
+ public GlobalDictMetadata(int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter, NavigableMap<DictSliceKey, String> sliceFileMap) {
+
+ Preconditions.checkNotNull(bytesConverter, "bytesConverter");
+ Preconditions.checkNotNull(sliceFileMap, "sliceFileMap");
+
+ this.baseId = baseId;
+ this.maxId = maxId;
+ this.maxValueLength = maxValueLength;
+ this.nValues = nValues;
+ this.bytesConverter = bytesConverter;
+ this.sliceFileMap = new TreeMap<>(sliceFileMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/bc6a1c3d/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java
new file mode 100644
index 0000000..5817868
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+
+import java.io.IOException;
+
+public abstract class GlobalDictStore {
+
+ protected final String baseDir; // base directory containing all versions of this global dict
+ protected final int maxVersions;
+ protected final int versionTTL;
+
+ protected GlobalDictStore(String baseDir) {
+ this.baseDir = Preconditions.checkNotNull(baseDir, "baseDir");
+ this.maxVersions = KylinConfig.getInstanceFromEnv().getAppendDictMaxVersions();
+ this.versionTTL = KylinConfig.getInstanceFromEnv().getAppendDictVersionTTL();
+ }
+
+ // workingDir should be an absolute path, will create if not exists
+ abstract void prepareForWrite(String workingDir) throws IOException;
+
+ /**
+ * @return all versions of this dictionary in ascending order
+ * @throws IOException on I/O error
+ */
+ abstract Long[] listAllVersions() throws IOException;
+
+ // return the path of specified version dir
+ abstract Path getVersionDir(long version);
+
+ /**
+ * Get the metadata for a particular version of the dictionary.
+ * @param version version number
+ * @return <i>GlobalDictMetadata</i> for the specified version
+ * @throws IOException on I/O error
+ */
+ abstract GlobalDictMetadata getMetadata(long version) throws IOException;
+
+ /**
+ * Read a <i>DictSlice</i> from a slice file.
+ * @param workingDir directory of the slice file
+ * @param sliceFileName file name of the slice
+ * @return a <i>DictSlice</i>
+ * @throws IOException on I/O error
+ */
+ abstract DictSlice readSlice(String workingDir, String sliceFileName);
+
+ /**
+ * Write a slice with the given key to the specified directory.
+ * @param workingDir where to write the slice, should exist
+ * @param key slice key
+ * @param slice slice to write
+ * @return file name of the new written slice
+ * @throws IOException on I/O error
+ */
+ abstract String writeSlice(String workingDir, DictSliceKey key, DictNode slice);
+
+ /**
+ * Delete a slice with the specified file name.
+ * @param workingDir directory of the slice file, should exist
+ * @param sliceFileName file name of the slice, should exist
+ * @throws IOException on I/O error
+ */
+ abstract void deleteSlice(String workingDir, String sliceFileName);
+
+ /**
+ * commit the <i>DictSlice</i> and <i>GlobalDictMetadata</i> in workingDir to new versionDir
+ * @param workingDir where store the tmp slice and index, should exist
+ * @param globalDictMetadata the metadata of global dict
+ * @throws IOException on I/O error
+ */
+ abstract void commit(String workingDir, GlobalDictMetadata globalDictMetadata) throws IOException;
+
+ /**
+ * Copy the latest version of this dict to another meta. The source is unchanged.
+ * @param srcConfig config of source meta
+ * @param dstConfig config of destination meta
+ * @return the new base directory for destination meta
+ * @throws IOException on I/O error
+ */
+ abstract String copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/bc6a1c3d/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
index cda3c2b..7921980 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
@@ -20,6 +20,7 @@ package org.apache.kylin.dict;
import java.io.IOException;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Dictionary;
/**
@@ -28,7 +29,7 @@ import org.apache.kylin.common.util.Dictionary;
* Created by sunyerui on 16/5/24.
*/
public class GlobalDictionaryBuilder implements IDictionaryBuilder {
- AppendTrieDictionary.Builder<String> builder;
+ AppendTrieDictionaryBuilder builder;
int baseId;
@Override
@@ -36,19 +37,20 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
if (dictInfo == null) {
throw new IllegalArgumentException("GlobalDictinaryBuilder must used with an existing DictionaryInfo");
}
- this.builder = AppendTrieDictionary.Builder.getInstance(dictInfo.getResourceDir());
+
+ int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
+ this.builder = new AppendTrieDictionaryBuilder(dictInfo.getResourceDir(), maxEntriesPerSlice);
this.baseId = baseId;
}
-
+
@Override
public boolean addValue(String value) {
if (value == null)
return false;
-
builder.addValue(value);
return true;
}
-
+
@Override
public Dictionary<String> build() throws IOException {
return builder.build(baseId);
http://git-wip-us.apache.org/repos/asf/kylin/blob/bc6a1c3d/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
index e2af338..9da5071 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
@@ -18,6 +18,7 @@
package org.apache.kylin.dict;
+import static org.apache.kylin.dict.GlobalDictHDFSStore.V2_INDEX_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@@ -29,62 +30,76 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
-/**
- * Created by sunyerui on 16/4/28.
- */
public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
- public static final String BASE_DIR = "file:///tmp/kylin_append_dict";
- public static final String RESOURCE_DIR = "/dict/append_dict_test";
+ private static final UUID uuid = UUID.randomUUID();
+ private static final String RESOURCE_DIR = "/dict/append_dict_test/" + uuid;
+ private static final String HDFS_DIR = "file:///tmp/kylin_append_dict";
+ private static String BASE_DIR;
+ private static String LOCAL_BASE_DIR = "/tmp/kylin_append_dict/kylin_metadata/resources/GlobalDict" + RESOURCE_DIR + "/";
@Before
- public void setUp() {
+ public void beforeTest() {
staticCreateTestMetadata();
- System.setProperty("kylin.dictionary.append-entry-size", "50000");
- System.setProperty("kylin.env.hdfs-working-dir", BASE_DIR);
+ KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "50000");
+ KylinConfig.getInstanceFromEnv().setProperty("kylin.env.hdfs-working-dir", HDFS_DIR);
+ BASE_DIR = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict" + RESOURCE_DIR + "/";
}
@After
- public void after() {
+ public void afterTest() {
cleanup();
staticCleanupTestMetadata();
}
- public static void cleanup() {
- Path basePath = new Path(BASE_DIR);
+ private void cleanup() {
+ Path basePath = new Path(HDFS_DIR);
try {
HadoopUtil.getFileSystem(basePath).delete(basePath, true);
- } catch (IOException e) {}
+ } catch (IOException e) {
+ }
}
- public static final String[] words = new String[] { "paint", "par", "part", "parts", "partition", "partitions", "party", "partie", "parties", "patient", "taste", "tar", "trie", "try", "tries", "\u5b57\u5178", "\u5b57\u5178\u6811", "\u5b57\u6bcd", // non-ascii characters
+ private static final String[] words = new String[] { "paint", "par", "part", "parts", "partition", "partitions", "party", "partie", "parties", "patient", "taste", "tar", "trie", "try", "tries", "\u5b57\u5178", "\u5b57\u5178\u6811", "\u5b57\u6bcd", // non-ascii characters
"", // empty
- "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii",
- "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiipaiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii",
+ "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii", "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiipaiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii",
"paintjkjdfklajkdljfkdsajklfjklsadjkjekjrklewjrklewjklrjklewjkljkljkljkljweklrjewkljrklewjrlkjewkljrkljkljkjlkjjkljkljkljkljlkjlkjlkjljdfadfads" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk"
- + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk",
+ + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk",
"paint", "tar", "try", // some dup
};
+ private AppendTrieDictionaryBuilder createBuilder(String resourceDir) throws IOException {
+ int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
+ return new AppendTrieDictionaryBuilder(resourceDir, maxEntriesPerSlice);
+ }
+
@Test
public void testStringRepeatly() throws IOException {
ArrayList<String> list = new ArrayList<>();
@@ -94,20 +109,22 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
notfound.add("pars");
notfound.add("tri");
notfound.add("\u5b57");
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < 50; i++) {
testStringDictAppend(list, notfound, true);
+ //to speed up the test
+ cleanup();
}
}
@Test
- public void englishWordsTest() throws Exception {
+ public void testEnglishWords() throws Exception {
InputStream is = new FileInputStream("src/test/resources/dict/english-words.80 (scowl-2015.05.18).txt");
ArrayList<String> str = loadStrings(is);
testStringDictAppend(str, null, false);
}
@Test
- public void categoryNamesTest() throws Exception {
+ public void testCategoryNames() throws Exception {
InputStream is = new FileInputStream("src/test/resources/dict/dw_category_grouping_names.dat");
ArrayList<String> str = loadStrings(is);
testStringDictAppend(str, null, true);
@@ -133,7 +150,8 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
@Ignore("need huge key set")
@Test
public void testHugeKeySet() throws IOException {
- AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
+ AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+
AppendTrieDictionary<String> dict = null;
InputStream is = new FileInputStream("src/test/resources/dict/huge_key");
@@ -143,17 +161,17 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
while ((word = reader.readLine()) != null) {
word = word.trim();
if (!word.isEmpty())
- b.addValue(word);
+ builder.addValue(word);
}
} finally {
reader.close();
is.close();
}
- dict = b.build(0);
+ dict = builder.build(0);
dict.dump(System.out);
}
- private static void testStringDictAppend(ArrayList<String> list, ArrayList<String> notfound, boolean shuffleList) throws IOException {
+ private void testStringDictAppend(ArrayList<String> list, ArrayList<String> notfound, boolean shuffleList) throws IOException {
Random rnd = new Random(System.currentTimeMillis());
ArrayList<String> strList = new ArrayList<String>();
strList.addAll(list);
@@ -162,8 +180,8 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
}
BytesConverter converter = new StringBytesConverter();
- AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
- AppendTrieDictionary<String> dict = null;
+ AppendTrieDictionaryBuilder b = createBuilder(RESOURCE_DIR);
+
TreeMap<Integer, String> checkMap = new TreeMap<>();
int firstAppend = rnd.nextInt(strList.size() / 2);
int secondAppend = firstAppend + rnd.nextInt((strList.size() - firstAppend) / 2);
@@ -173,7 +191,7 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
for (; appendIndex < firstAppend; appendIndex++) {
b.addValue(strList.get(appendIndex));
}
- dict = b.build(0);
+ AppendTrieDictionary<String> dict = b.build(0);
dict.dump(System.out);
for (; checkIndex < firstAppend; checkIndex++) {
String str = strList.get(checkIndex);
@@ -185,13 +203,13 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
}
// reopen dict and append
-// b = AppendTrieDictionary.Builder.create(dict);
- b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR, dict);
+ b = createBuilder(RESOURCE_DIR);
+
for (; appendIndex < secondAppend; appendIndex++) {
b.addValue(strList.get(appendIndex));
}
- AppendTrieDictionary newDict = b.build(0);
- assert newDict == dict;
+ AppendTrieDictionary<String> newDict = b.build(0);
+ assert newDict.equals(dict);
dict = newDict;
dict.dump(System.out);
checkIndex = 0;
@@ -210,12 +228,13 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
}
// reopen dict and append rest str
- b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR, dict);
+ b = createBuilder(RESOURCE_DIR);
+
for (; appendIndex < strList.size(); appendIndex++) {
b.addValue(strList.get(appendIndex));
}
newDict = b.build(0);
- assert newDict == dict;
+ assert newDict.equals(dict);
dict = newDict;
dict.dump(System.out);
checkIndex = 0;
@@ -268,7 +287,7 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
@Test
public void testMaxInteger() throws IOException {
- AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
+ AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
builder.setMaxId(Integer.MAX_VALUE - 2);
builder.addValue("a");
builder.addValue("ab");
@@ -284,7 +303,7 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
@Ignore("Only occurred when value is very long (>8000 bytes)")
@Test
public void testSuperLongValue() throws IOException {
- AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
+ AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
String value = "a";
for (int i = 0; i < 10000; i++) {
value += "a";
@@ -299,17 +318,15 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
dictionary.getMaxId();
}
- private static class SharedBuilderThread extends Thread {
+ private class SharedBuilderThread extends Thread {
CountDownLatch startLatch;
CountDownLatch finishLatch;
- String resourcePath;
String prefix;
int count;
- SharedBuilderThread(CountDownLatch startLatch, CountDownLatch finishLatch, String resourcePath, String prefix, int count) {
+ SharedBuilderThread(CountDownLatch startLatch, CountDownLatch finishLatch, String prefix, int count) {
this.startLatch = startLatch;
this.finishLatch = finishLatch;
- this.resourcePath = resourcePath;
this.prefix = prefix;
this.count = count;
}
@@ -317,27 +334,28 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
@Override
public void run() {
try {
- AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(resourcePath);
+ AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
startLatch.countDown();
for (int i = 0; i < count; i++) {
builder.addValue(prefix + i);
}
builder.build(0);
finishLatch.countDown();
- } catch (IOException e) {}
+ } catch (IOException e) {
+ }
}
}
+ @Ignore
@Test
public void testSharedBuilder() throws IOException, InterruptedException {
- String resourcePath = "shared_builder";
final CountDownLatch startLatch = new CountDownLatch(3);
final CountDownLatch finishLatch = new CountDownLatch(3);
- AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(resourcePath);
- Thread t1 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t1_", 10000);
- Thread t2 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t2_", 10);
- Thread t3 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t3_", 100000);
+ AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+ Thread t1 = new SharedBuilderThread(startLatch, finishLatch, "t1_", 10000);
+ Thread t2 = new SharedBuilderThread(startLatch, finishLatch, "t2_", 10);
+ Thread t3 = new SharedBuilderThread(startLatch, finishLatch, "t3_", 100000);
t1.start();
t2.start();
t3.start();
@@ -345,23 +363,228 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
AppendTrieDictionary dict = builder.build(0);
assertTrue("AppendDictBuilder Thread too slow", finishLatch.await(3000, TimeUnit.MILLISECONDS));
assertEquals(110010, dict.getMaxId());
- try {
- builder.addValue("fail");
- fail("Builder should be closed");
- } catch (Exception e) {}
- builder = AppendTrieDictionary.Builder.getInstance(resourcePath, dict);
+ builder = createBuilder(RESOURCE_DIR);
builder.addValue("success");
+ builder.addValue("s");
dict = builder.build(0);
- for (int i = 0; i < 10000; i ++) {
+ for (int i = 0; i < 10000; i++) {
assertNotEquals(-1, dict.getIdFromValue("t1_" + i));
}
- for (int i = 0; i < 10; i ++) {
+ for (int i = 0; i < 10; i++) {
assertNotEquals(-1, dict.getIdFromValue("t2_" + i));
}
- for (int i = 0; i < 100000; i ++) {
+ for (int i = 0; i < 100000; i++) {
assertNotEquals(-1, dict.getIdFromValue("t3_" + i));
}
assertEquals(110011, dict.getIdFromValue("success"));
+ assertEquals(110012, dict.getIdFromValue("s"));
+ }
+
+ @Test
+ public void testSplitContainSuperLongValue() throws IOException {
+ String superLongValue = "%5Cx1A%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%7E%29%5CxEF%5CxBF%5CxBD%5Cx1B+%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5Cx13%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5B";
+
+ createAppendTrieDict(Arrays.asList("a", superLongValue));
+ }
+
+ @Test
+ public void testSuperLongValueAsFileName() throws IOException {
+ String superLongValue = "%5Cx1A%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%7E%29%5CxEF%5CxBF%5CxBD%5Cx1B+%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5Cx13%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5B";
+
+ createAppendTrieDict(Arrays.asList("a", superLongValue));
+ }
+
+ @Test
+ public void testIllegalFileNameValue() throws IOException {
+ createAppendTrieDict(Arrays.asList("::", ":"));
+ }
+
+ @Test
+ public void testSkipAddValue() throws IOException {
+ createAppendTrieDict(new ArrayList<String>());
+ }
+
+ private void createAppendTrieDict(List<String> valueList) throws IOException {
+ KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "1");
+
+ AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+
+ for (String value : valueList) {
+ builder.addValue(value);
+ }
+
+ builder.build(0);
+ }
+
+ private static class CachedFileFilter implements FileFilter {
+ @Override
+ public boolean accept(File pathname) {
+ return pathname.getName().startsWith("cached_");
+ }
+ }
+
+ private static class VersionFilter implements FileFilter {
+ @Override
+ public boolean accept(File pathname) {
+ return pathname.getName().startsWith(GlobalDictHDFSStore.VERSION_PREFIX);
+ }
+ }
+
+ @Test
+ public void testMultiVersions() throws IOException, InterruptedException {
+ KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4");
+
+ AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+ builder.addValue("a");
+ builder.addValue("b");
+ builder.addValue("c");
+ builder.addValue("d");
+ builder.addValue("e");
+ builder.addValue("f");
+ AppendTrieDictionary dict = builder.build(0);
+
+ assertEquals(2, dict.getIdFromValue("b"));
+
+ // re-open dict, append new data
+ builder = createBuilder(RESOURCE_DIR);
+ builder.addValue("g");
+
+ // new data is not visible
+ try {
+ dict.getIdFromValue("g");
+ fail("Value 'g' (g) not exists!");
+ } catch (IllegalArgumentException e) {
+
+ }
+
+ // append data, and be visible for new immutable map
+ builder.addValue("h");
+
+ AppendTrieDictionary newDict = builder.build(0);
+ assert newDict.equals(dict);
+
+ assertEquals(7, newDict.getIdFromValue("g"));
+ assertEquals(8, newDict.getIdFromValue("h"));
+
+ // Check versions retention
+ File dir = new File(LOCAL_BASE_DIR);
+ assertEquals(2, dir.listFiles(new VersionFilter()).length);
+ }
+
+ @Test
+ public void testVersionRetention() throws IOException, InterruptedException {
+ KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4");
+ KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-max-versions", "1");
+ KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-version-ttl", "1000");
+
+ AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+ builder.addValue("a");
+
+ //version 1
+ builder.build(0);
+
+ // Check versions retention
+ File dir = new File(LOCAL_BASE_DIR);
+ assertEquals(1, dir.listFiles(new VersionFilter()).length);
+
+ // sleep to make version 1 expired
+ Thread.sleep(1200);
+
+ //version 2
+ builder = createBuilder(RESOURCE_DIR);
+ builder.addValue("");
+ builder.build(0);
+
+ // Check versions retention
+ assertEquals(1, dir.listFiles(new VersionFilter()).length);
+ }
+
+ @Test
+ public void testOldDirFormat() throws IOException {
+ KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4");
+
+ AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+ builder.addValue("a");
+ builder.addValue("b");
+ builder.addValue("c");
+ builder.addValue("d");
+ builder.addValue("e");
+ builder.addValue("f");
+ builder.build(0);
+
+ convertDirToOldFormat(BASE_DIR);
+
+ File dir = new File(LOCAL_BASE_DIR);
+ assertEquals(0, dir.listFiles(new VersionFilter()).length);
+ assertEquals(3, dir.listFiles(new CachedFileFilter()).length);
+
+ //convert older format to new format when builder init
+ builder = createBuilder(RESOURCE_DIR);
+ builder.build(0);
+
+ assertEquals(1, dir.listFiles(new VersionFilter()).length);
+ }
+
+ private void convertDirToOldFormat(String baseDir) throws IOException {
+ Path basePath = new Path(baseDir);
+ FileSystem fs = HadoopUtil.getFileSystem(basePath);
+
+ // move version dir to base dir, to simulate the older format
+ GlobalDictHDFSStore store = new GlobalDictHDFSStore(baseDir);
+ Long[] versions = store.listAllVersions();
+ Path versionPath = store.getVersionDir(versions[versions.length - 1]);
+ Path tmpVersionPath = new Path(versionPath.getParent().getParent(), versionPath.getName());
+ fs.rename(versionPath, tmpVersionPath);
+ fs.delete(new Path(baseDir), true);
+ fs.rename(tmpVersionPath, new Path(baseDir));
+ }
+
+ @Test
+ public void testOldIndexFormat() throws IOException {
+ KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4");
+
+ AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+ builder.addValue("a");
+ builder.addValue("b");
+ builder.addValue("c");
+ builder.addValue("d");
+ builder.addValue("e");
+ builder.addValue("f");
+ builder.build(0);
+
+ convertIndexToOldFormat(BASE_DIR);
+
+ builder = createBuilder(RESOURCE_DIR);
+ builder.addValue("g");
+ builder.addValue("h");
+ builder.addValue("i");
+ AppendTrieDictionary dict = builder.build(0);
+
+ assertEquals(1, dict.getIdFromValue("a"));
+ assertEquals(7, dict.getIdFromValue("g"));
+ }
+
+ private void convertIndexToOldFormat(String baseDir) throws IOException {
+ Path basePath = new Path(baseDir);
+ FileSystem fs = HadoopUtil.getFileSystem(basePath);
+
+ GlobalDictHDFSStore store = new GlobalDictHDFSStore(baseDir);
+ Long[] versions = store.listAllVersions();
+ GlobalDictMetadata metadata = store.getMetadata(versions[versions.length - 1]);
+
+ //convert v2 index to v1 index
+ Path versionPath = store.getVersionDir(versions[versions.length - 1]);
+ Path v2IndexFile = new Path(versionPath, V2_INDEX_NAME);
+
+ fs.delete(v2IndexFile, true);
+ GlobalDictHDFSStore.IndexFormat indexFormatV1 = new GlobalDictHDFSStore.IndexFormatV1(fs, HadoopUtil.getCurrentConfiguration());
+ indexFormatV1.writeIndexFile(versionPath, metadata);
+
+ //convert v2 fileName format to v1 fileName format
+ for (Map.Entry<DictSliceKey, String> entry : metadata.sliceFileMap.entrySet()) {
+ fs.rename(new Path(versionPath, entry.getValue()), new Path(versionPath, "cached_" + entry.getKey()));
+ }
}
+
}
[3/7] kylin git commit: KYLIN-2506 Refactor Global Dictionary
Posted by li...@apache.org.
KYLIN-2506 Refactor Global Dictionary
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/bc6a1c3d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/bc6a1c3d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/bc6a1c3d
Branch: refs/heads/master-KYLIN-2506
Commit: bc6a1c3d0cc909e42249619de2072e1f2acc2d59
Parents: 7d6329b
Author: kangkaisen <ka...@163.com>
Authored: Mon Feb 20 21:06:44 2017 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun Apr 16 17:12:49 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/dict/AppendTrieDictionary.java | 1197 +-----------------
.../kylin/dict/AppendTrieDictionaryBuilder.java | 289 +++++
.../kylin/dict/AppendTrieDictionaryChecker.java | 9 +-
.../org/apache/kylin/dict/CachedTreeMap.java | 481 -------
.../java/org/apache/kylin/dict/DictNode.java | 376 ++++++
.../java/org/apache/kylin/dict/DictSlice.java | 283 +++++
.../org/apache/kylin/dict/DictSliceKey.java | 75 ++
.../apache/kylin/dict/GlobalDictHDFSStore.java | 420 ++++++
.../apache/kylin/dict/GlobalDictMetadata.java | 50 +
.../org/apache/kylin/dict/GlobalDictStore.java | 102 ++
.../kylin/dict/GlobalDictionaryBuilder.java | 12 +-
.../kylin/dict/AppendTrieDictionaryTest.java | 329 ++++-
.../apache/kylin/dict/CachedTreeMapTest.java | 378 ------
.../engine/spark/KylinKryoRegistrator.java | 3 -
14 files changed, 1946 insertions(+), 2058 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/bc6a1c3d/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
index 962686d..ea216ba 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
@@ -15,1173 +15,125 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kylin.dict;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Dictionary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.io.PrintStream;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashSet;
-import java.util.IdentityHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.NavigableSet;
import java.util.Objects;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.metadata.MetadataManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.concurrent.ExecutionException;
+import static com.google.common.base.Preconditions.checkState;
/**
* A dictionary based on Trie data structure that maps enumerations of byte[] to
* int IDs, used for global dictionary.
- *
- * Trie data is split into sub trees, called {@link DictSlice}, and stored in a {@link CachedTreeMap} with a configurable cache size.
- *
+ * <p>
+ * Trie data is split into sub trees, called {@link DictSlice}.
+ * <p>
* With Trie the memory footprint of the mapping is kinda minimized at the cost
* CPU, if compared to HashMap of ID Arrays. Performance test shows Trie is
* roughly 10 times slower, so there's a cache layer overlays on top of Trie and
* gracefully fall back to Trie using a weak reference.
- *
+ * <p>
* The implementation is NOT thread-safe for now.
- *
+ * <p>
* TODO making it thread-safe
*
* @author sunyerui
*/
@SuppressWarnings({ "rawtypes", "unchecked", "serial" })
public class AppendTrieDictionary<T> extends CacheDictionary<T> {
-
public static final byte[] HEAD_MAGIC = new byte[] { 0x41, 0x70, 0x70, 0x65, 0x63, 0x64, 0x54, 0x72, 0x69, 0x65, 0x44, 0x69, 0x63, 0x74 }; // "AppendTrieDict"
public static final int HEAD_SIZE_I = HEAD_MAGIC.length;
-
- public static final int BIT_IS_LAST_CHILD = 0x80;
- public static final int BIT_IS_END_OF_VALUE = 0x40;
-
private static final Logger logger = LoggerFactory.getLogger(AppendTrieDictionary.class);
transient private String baseDir;
- transient private int maxId;
- transient private int maxValueLength;
- transient private int nValues;
-
- volatile private TreeMap<DictSliceKey, DictSlice> dictSliceMap;
-
- // Constructor both for build and deserialize
- public AppendTrieDictionary() {
- enableCache();
- }
+ transient private GlobalDictMetadata metadata;
+ transient private LoadingCache<DictSliceKey, DictSlice> dictCache;
- public void initParams(String baseDir, int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter) throws IOException {
+ public void init(String baseDir) throws IOException {
this.baseDir = baseDir;
- this.baseId = baseId;
- this.maxId = maxId;
- this.maxValueLength = maxValueLength;
- this.nValues = nValues;
- this.bytesConvert = bytesConverter;
- }
-
- public void initDictSliceMap(CachedTreeMap dictMap) throws IOException {
- int maxVersions = KylinConfig.getInstanceFromEnv().getAppendDictMaxVersions();
- long versionTTL = KylinConfig.getInstanceFromEnv().getAppendDictVersionTTL();
- CachedTreeMap newDictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(1).baseDir(baseDir).immutable(true).maxVersions(maxVersions).versionTTL(versionTTL).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build();
- newDictSliceMap.loadEntry(dictMap);
- this.dictSliceMap = newDictSliceMap;
- }
-
- public byte[] writeDictMap() throws IOException {
- ByteArrayOutputStream buf = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(buf);
- ((Writable) dictSliceMap).write(out);
- byte[] dictMapBytes = buf.toByteArray();
- buf.close();
- out.close();
-
- return dictMapBytes;
- }
-
- // The dict id starts from 1 to 2147483647 and 2147483648 to -2, leave 0 and -1 used for uninitialized state
- public static void checkValidId(int id) {
- if (id == 0 || id == -1) {
- throw new IllegalArgumentException("AppendTrieDictionary Id Overflow Unsigned Integer Size 4294967294");
- }
- }
-
- public static class DictSliceKey implements WritableComparable, java.io.Serializable {
- byte[] key;
-
- public static DictSliceKey wrap(byte[] key) {
- DictSliceKey dictKey = new DictSliceKey();
- dictKey.key = key;
- return dictKey;
- }
-
- @Override
- public String toString() {
- return Bytes.toStringBinary(key);
- }
-
- @Override
- public int hashCode() {
- return Arrays.hashCode(key);
- }
-
- @Override
- public int compareTo(Object o) {
- if (!(o instanceof DictSliceKey)) {
- return -1;
- }
- DictSliceKey other = (DictSliceKey) o;
- return Bytes.compareTo(key, other.key);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(key.length);
- out.write(key);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- key = new byte[in.readInt()];
- in.readFully(key);
- }
- }
-
- public static class DictSlice<T> implements Writable, java.io.Serializable {
- public DictSlice() {
- }
-
- public DictSlice(byte[] trieBytes) {
- init(trieBytes);
- }
-
- private byte[] trieBytes;
-
- // non-persistent part
- transient private int headSize;
- transient private int bodyLen;
- transient private int sizeChildOffset;
-
- transient private int nValues;
- transient private int sizeOfId;
- // mask MUST be long, since childOffset maybe 5 bytes at most
- transient private long childOffsetMask;
- transient private int firstByteOffset;
-
- private void init(byte[] trieBytes) {
- this.trieBytes = trieBytes;
- if (BytesUtil.compareBytes(HEAD_MAGIC, 0, trieBytes, 0, HEAD_MAGIC.length) != 0)
- throw new IllegalArgumentException("Wrong file type (magic does not match)");
-
- try {
- DataInputStream headIn = new DataInputStream(new ByteArrayInputStream(trieBytes, HEAD_SIZE_I, trieBytes.length - HEAD_SIZE_I));
- this.headSize = headIn.readShort();
- this.bodyLen = headIn.readInt();
- this.nValues = headIn.readInt();
- this.sizeChildOffset = headIn.read();
- this.sizeOfId = headIn.read();
-
- this.childOffsetMask = ~(((long) (BIT_IS_LAST_CHILD | BIT_IS_END_OF_VALUE)) << ((sizeChildOffset - 1) * 8));
- this.firstByteOffset = sizeChildOffset + 1; // the offset from begin of node to its first value byte
- } catch (Exception e) {
- if (e instanceof RuntimeException)
- throw (RuntimeException) e;
- else
- throw new RuntimeException(e);
- }
- }
-
- public byte[] getFirstValue() {
- int nodeOffset = headSize;
- ByteArrayOutputStream bytes = new ByteArrayOutputStream();
- while (true) {
- int valueLen = BytesUtil.readUnsigned(trieBytes, nodeOffset + firstByteOffset - 1, 1);
- bytes.write(trieBytes, nodeOffset + firstByteOffset, valueLen);
- if (checkFlag(nodeOffset, BIT_IS_END_OF_VALUE)) {
- break;
- }
- nodeOffset = headSize + (int) (BytesUtil.readLong(trieBytes, nodeOffset, sizeChildOffset) & childOffsetMask);
- if (nodeOffset == headSize) {
- break;
- }
- }
- return bytes.toByteArray();
- }
-
- /**
- * returns a code point from [0, nValues), preserving order of value
- *
- * @param n
- * -- the offset of current node
- * @param inp
- * -- input value bytes to lookup
- * @param o
- * -- offset in the input value bytes matched so far
- * @param inpEnd
- * -- end of input
- * @param roundingFlag
- * -- =0: return -1 if not found
- * -- <0: return closest smaller if not found, return -1
- * -- >0: return closest bigger if not found, return nValues
- */
- private int lookupSeqNoFromValue(int n, byte[] inp, int o, int inpEnd, int roundingFlag) {
- while (true) {
- // match the current node
- int p = n + firstByteOffset; // start of node's value
- int end = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1); // end of node's value
- for (; p < end && o < inpEnd; p++, o++) { // note matching start from [0]
- if (trieBytes[p] != inp[o]) {
- return -1; // mismatch
- }
- }
-
- // node completely matched, is input all consumed?
- boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE);
- if (o == inpEnd) {
- return p == end && isEndOfValue ? BytesUtil.readUnsigned(trieBytes, end, sizeOfId) : -1;
- }
-
- // find a child to continue
- int c = headSize + (int) (BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask);
- if (c == headSize) // has no children
- return -1;
- byte inpByte = inp[o];
- int comp;
- while (true) {
- p = c + firstByteOffset;
- comp = BytesUtil.compareByteUnsigned(trieBytes[p], inpByte);
- if (comp == 0) { // continue in the matching child, reset n and loop again
- n = c;
- break;
- } else if (comp < 0) { // try next child
- if (checkFlag(c, BIT_IS_LAST_CHILD))
- return -1;
- c = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1) + (checkFlag(c, BIT_IS_END_OF_VALUE) ? sizeOfId : 0);
- } else { // children are ordered by their first value byte
- return -1;
- }
- }
- }
- }
-
- private boolean checkFlag(int offset, int bit) {
- return (trieBytes[offset] & bit) > 0;
- }
-
- public int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag) {
- int id = lookupSeqNoFromValue(headSize, value, offset, offset + len, roundingFlag);
- return id;
- }
-
- private DictNode rebuildTrieTree() {
- return rebuildTrieTreeR(headSize, null);
- }
-
- private DictNode rebuildTrieTreeR(int n, DictNode parent) {
- DictNode root = null;
- while (true) {
- int p = n + firstByteOffset;
- int childOffset = (int) (BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask);
- int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1);
- boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE);
-
- byte[] value = new byte[parLen];
- System.arraycopy(trieBytes, p, value, 0, parLen);
-
- DictNode node = new DictNode(value, isEndOfValue);
- if (isEndOfValue) {
- int id = BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId);
- node.id = id;
- }
-
- if (parent == null) {
- root = node;
- } else {
- parent.addChild(node);
- }
-
- if (childOffset != 0) {
- rebuildTrieTreeR(childOffset + headSize, node);
- }
-
- if (checkFlag(n, BIT_IS_LAST_CHILD)) {
- break;
- } else {
- n += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0);
- }
- }
- return root;
- }
-
- public boolean doCheck() {
- int offset = headSize;
- HashSet<Integer> parentSet = new HashSet<>();
- boolean lastChild = false;
-
- while (offset < trieBytes.length) {
- if (lastChild) {
- boolean contained = parentSet.remove(offset - headSize);
- // Can't find parent, the data is corrupted
- if (!contained) {
- return false;
- }
- lastChild = false;
- }
- int p = offset + firstByteOffset;
- int childOffset = (int) (BytesUtil.readLong(trieBytes, offset, sizeChildOffset) & childOffsetMask);
- int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1);
- boolean isEndOfValue = checkFlag(offset, BIT_IS_END_OF_VALUE);
-
- // Copy value overflow, the data is corrupted
- if (trieBytes.length < p + parLen) {
- return false;
- }
-
- // Check id is fine
- if (isEndOfValue) {
- BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId);
- }
-
- // Record it if has children
- if (childOffset != 0) {
- parentSet.add(childOffset);
- }
-
- // brothers done, move to next parent
- if (checkFlag(offset, BIT_IS_LAST_CHILD)) {
- lastChild = true;
- }
-
- // move to next node
- offset += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0);
- }
-
- // ParentMap is empty, meaning all nodes has parent, the data is correct
- return parentSet.isEmpty();
- }
-
- public void write(DataOutput out) throws IOException {
- out.write(trieBytes);
- }
-
- public void readFields(DataInput in) throws IOException {
- byte[] headPartial = new byte[HEAD_MAGIC.length + Short.SIZE / Byte.SIZE + Integer.SIZE / Byte.SIZE];
- in.readFully(headPartial);
-
- if (BytesUtil.compareBytes(HEAD_MAGIC, 0, headPartial, 0, HEAD_MAGIC.length) != 0)
- throw new IllegalArgumentException("Wrong file type (magic does not match)");
-
- DataInputStream headIn = new DataInputStream(//
- new ByteArrayInputStream(headPartial, HEAD_SIZE_I, headPartial.length - HEAD_SIZE_I));
- int headSize = headIn.readShort();
- int bodyLen = headIn.readInt();
- headIn.close();
-
- byte[] all = new byte[headSize + bodyLen];
- System.arraycopy(headPartial, 0, all, 0, headPartial.length);
- in.readFully(all, headPartial.length, all.length - headPartial.length);
-
- init(all);
- }
-
- public static DictNode rebuildNodeByDeserialize(DataInput in) throws IOException {
- DictSlice slice = new DictSlice();
- slice.readFields(in);
- return slice.rebuildTrieTree();
- }
-
- @Override
- public String toString() {
- return String.format("DictSlice[firstValue=%s, values=%d, bytes=%d]", Bytes.toStringBinary(getFirstValue()), nValues, bodyLen);
- }
-
- @Override
- public int hashCode() {
- return Arrays.hashCode(trieBytes);
- }
-
- @Override
- public boolean equals(Object o) {
- if ((o instanceof AppendTrieDictionary.DictSlice) == false) {
- logger.info("Equals return false because it's not DictInfo");
- return false;
- }
- DictSlice that = (DictSlice) o;
- return Arrays.equals(this.trieBytes, that.trieBytes);
- }
- }
-
- public static class DictNode implements Writable, java.io.Serializable {
- public byte[] part;
- public int id = -1;
- public boolean isEndOfValue;
- public ArrayList<DictNode> children = new ArrayList<>();
-
- public int nValuesBeneath;
- public DictNode parent;
- public int childrenCount = 1;
-
- public DictNode() {
- }
-
- public void clone(DictNode o) {
- this.part = o.part;
- this.id = o.id;
- this.isEndOfValue = o.isEndOfValue;
- this.children = o.children;
- for (DictNode child : o.children) {
- child.parent = this;
- }
- this.nValuesBeneath = o.nValuesBeneath;
- this.parent = o.parent;
- this.childrenCount = o.childrenCount;
- }
-
- DictNode(byte[] value, boolean isEndOfValue) {
- reset(value, isEndOfValue);
- }
-
- DictNode(byte[] value, boolean isEndOfValue, ArrayList<DictNode> children) {
- reset(value, isEndOfValue, children);
- }
-
- void reset(byte[] value, boolean isEndOfValue) {
- reset(value, isEndOfValue, new ArrayList<DictNode>());
- }
-
- void reset(byte[] value, boolean isEndOfValue, ArrayList<DictNode> children) {
- this.part = value;
- this.isEndOfValue = isEndOfValue;
- clearChild();
- for (DictNode child : children) {
- addChild(child);
- }
- this.id = -1;
- }
-
- void clearChild() {
- this.children.clear();
- int childrenCountDelta = this.childrenCount - 1;
- for (DictNode p = this; p != null; p = p.parent) {
- p.childrenCount -= childrenCountDelta;
- }
- }
-
- void addChild(DictNode child) {
- addChild(-1, child);
- }
-
- void addChild(int index, DictNode child) {
- child.parent = this;
- if (index < 0) {
- this.children.add(child);
- } else {
- this.children.add(index, child);
- }
- for (DictNode p = this; p != null; p = p.parent) {
- p.childrenCount += child.childrenCount;
- }
- }
-
- public DictNode removeChild(int index) {
- DictNode child = children.remove(index);
- child.parent = null;
- for (DictNode p = this; p != null; p = p.parent) {
- p.childrenCount -= child.childrenCount;
- }
- return child;
- }
-
- public DictNode duplicateNode() {
- DictNode newChild = new DictNode(part, false);
- newChild.parent = parent;
- if (parent != null) {
- int index = parent.children.indexOf(this);
- parent.addChild(index + 1, newChild);
- }
- return newChild;
- }
-
- public byte[] firstValue() {
- ByteArrayOutputStream bytes = new ByteArrayOutputStream();
- DictNode p = this;
- while (true) {
- bytes.write(p.part, 0, p.part.length);
- if (p.isEndOfValue || p.children.size() == 0) {
- break;
- }
- p = p.children.get(0);
- }
- return bytes.toByteArray();
- }
-
- public static DictNode splitNodeTree(DictNode splitNode) {
- if (splitNode == null) {
- return null;
- }
- DictNode current = splitNode;
- DictNode p = current.parent;
- while (p != null) {
- int index = p.children.indexOf(current);
- assert index != -1;
- DictNode newParent = p.duplicateNode();
- for (int i = p.children.size() - 1; i >= index; i--) {
- DictNode child = p.removeChild(i);
- newParent.addChild(0, child);
- }
- current = newParent;
- p = p.parent;
- }
- return current;
- }
-
- public static void mergeSingleByteNode(DictNode root, int leftOrRight) {
- DictNode current = root;
- DictNode child;
- while (!current.children.isEmpty()) {
- child = leftOrRight == 0 ? current.children.get(0) : current.children.get(current.children.size() - 1);
- if (current.children.size() > 1 || current.isEndOfValue) {
- current = child;
- continue;
- }
- byte[] newValue = new byte[current.part.length + child.part.length];
- System.arraycopy(current.part, 0, newValue, 0, current.part.length);
- System.arraycopy(child.part, 0, newValue, current.part.length, child.part.length);
- current.reset(newValue, child.isEndOfValue, child.children);
- current.id = child.id;
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- byte[] bytes = buildTrieBytes();
- out.write(bytes);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- DictNode root = DictSlice.rebuildNodeByDeserialize(in);
- this.clone(root);
- }
-
- protected byte[] buildTrieBytes() {
- Stats stats = Stats.stats(this);
- int sizeChildOffset = stats.mbpn_sizeChildOffset;
- int sizeId = stats.mbpn_sizeId;
-
- // write head
- byte[] head;
- try {
- ByteArrayOutputStream byteBuf = new ByteArrayOutputStream();
- DataOutputStream headOut = new DataOutputStream(byteBuf);
- headOut.write(AppendTrieDictionary.HEAD_MAGIC);
- headOut.writeShort(0); // head size, will back fill
- headOut.writeInt(stats.mbpn_footprint); // body size
- headOut.writeInt(stats.nValues);
- headOut.write(sizeChildOffset);
- headOut.write(sizeId);
- headOut.close();
- head = byteBuf.toByteArray();
- BytesUtil.writeUnsigned(head.length, head, AppendTrieDictionary.HEAD_SIZE_I, 2);
- } catch (IOException e) {
- throw new RuntimeException(e); // shall not happen, as we are
- }
-
- byte[] trieBytes = new byte[stats.mbpn_footprint + head.length];
- System.arraycopy(head, 0, trieBytes, 0, head.length);
-
- LinkedList<DictNode> open = new LinkedList<DictNode>();
- IdentityHashMap<DictNode, Integer> offsetMap = new IdentityHashMap<DictNode, Integer>();
-
- // write body
- int o = head.length;
- offsetMap.put(this, o);
- o = build_writeNode(this, o, true, sizeChildOffset, sizeId, trieBytes);
- if (this.children.isEmpty() == false)
- open.addLast(this);
-
- while (open.isEmpty() == false) {
- DictNode parent = open.removeFirst();
- build_overwriteChildOffset(offsetMap.get(parent), o - head.length, sizeChildOffset, trieBytes);
- for (int i = 0; i < parent.children.size(); i++) {
- DictNode c = parent.children.get(i);
- boolean isLastChild = (i == parent.children.size() - 1);
- offsetMap.put(c, o);
- o = build_writeNode(c, o, isLastChild, sizeChildOffset, sizeId, trieBytes);
- if (c.children.isEmpty() == false)
- open.addLast(c);
- }
- }
-
- if (o != trieBytes.length)
- throw new RuntimeException();
- return trieBytes;
- }
-
- private void build_overwriteChildOffset(int parentOffset, int childOffset, int sizeChildOffset, byte[] trieBytes) {
- int flags = (int) trieBytes[parentOffset] & (TrieDictionary.BIT_IS_LAST_CHILD | TrieDictionary.BIT_IS_END_OF_VALUE);
- BytesUtil.writeUnsigned(childOffset, trieBytes, parentOffset, sizeChildOffset);
- trieBytes[parentOffset] |= flags;
- }
-
- private int build_writeNode(DictNode n, int offset, boolean isLastChild, int sizeChildOffset, int sizeId, byte[] trieBytes) {
- int o = offset;
-
- // childOffset
- if (isLastChild)
- trieBytes[o] |= TrieDictionary.BIT_IS_LAST_CHILD;
- if (n.isEndOfValue)
- trieBytes[o] |= TrieDictionary.BIT_IS_END_OF_VALUE;
- o += sizeChildOffset;
-
- // nValueBytes
- if (n.part.length > 255)
- throw new RuntimeException("Value length is " + n.part.length + " and larger than 255: " + Bytes.toStringBinary(n.part));
- BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1);
- o++;
-
- // valueBytes
- System.arraycopy(n.part, 0, trieBytes, o, n.part.length);
- o += n.part.length;
-
- if (n.isEndOfValue) {
- checkValidId(n.id);
- BytesUtil.writeUnsigned(n.id, trieBytes, o, sizeId);
- o += sizeId;
- }
-
- return o;
- }
-
- @Override
- public String toString() {
- return String.format("DictNode[root=%s, nodes=%d, firstValue=%s]", Bytes.toStringBinary(part), childrenCount, Bytes.toStringBinary(firstValue()));
- }
- }
-
- public static class Stats {
- public interface Visitor {
- void visit(DictNode n, int level);
- }
-
- private static void traverseR(DictNode node, Visitor visitor, int level) {
- visitor.visit(node, level);
- for (DictNode c : node.children)
- traverseR(c, visitor, level + 1);
- }
-
- private static void traversePostOrderR(DictNode node, Visitor visitor, int level) {
- for (DictNode c : node.children)
- traversePostOrderR(c, visitor, level + 1);
- visitor.visit(node, level);
- }
-
- public int nValues; // number of values in total
- public int nValueBytesPlain; // number of bytes for all values
- // uncompressed
- public int nValueBytesCompressed; // number of values bytes in Trie
- // (compressed)
- public int maxValueLength; // size of longest value in bytes
-
- // the trie is multi-byte-per-node
- public int mbpn_nNodes; // number of nodes in trie
- public int mbpn_trieDepth; // depth of trie
- public int mbpn_maxFanOut; // the maximum no. children
- public int mbpn_nChildLookups; // number of child lookups during lookup
- // every value once
- public int mbpn_nTotalFanOut; // the sum of fan outs during lookup every
- // value once
- public int mbpn_sizeValueTotal; // the sum of value space in all nodes
- public int mbpn_sizeNoValueBytes; // size of field noValueBytes
- public int mbpn_sizeChildOffset; // size of field childOffset, points to
- // first child in flattened array
- public int mbpn_sizeId; // size of id value, always be 4
- public int mbpn_footprint; // MBPN footprint in bytes
-
- /**
- * out print some statistics of the trie and the dictionary built from it
- */
- public static Stats stats(DictNode root) {
- // calculate nEndValueBeneath
- traversePostOrderR(root, new Visitor() {
- @Override
- public void visit(DictNode n, int level) {
- n.nValuesBeneath = n.isEndOfValue ? 1 : 0;
- for (DictNode c : n.children)
- n.nValuesBeneath += c.nValuesBeneath;
- }
- }, 0);
-
- // run stats
- final Stats s = new Stats();
- final ArrayList<Integer> lenAtLvl = new ArrayList<Integer>();
- traverseR(root, new Visitor() {
- @Override
- public void visit(DictNode n, int level) {
- if (n.isEndOfValue)
- s.nValues++;
- s.nValueBytesPlain += n.part.length * n.nValuesBeneath;
- s.nValueBytesCompressed += n.part.length;
- s.mbpn_nNodes++;
- if (s.mbpn_trieDepth < level + 1)
- s.mbpn_trieDepth = level + 1;
- if (n.children.size() > 0) {
- if (s.mbpn_maxFanOut < n.children.size())
- s.mbpn_maxFanOut = n.children.size();
- int childLookups = n.nValuesBeneath - (n.isEndOfValue ? 1 : 0);
- s.mbpn_nChildLookups += childLookups;
- s.mbpn_nTotalFanOut += childLookups * n.children.size();
- }
-
- if (level < lenAtLvl.size())
- lenAtLvl.set(level, n.part.length);
- else
- lenAtLvl.add(n.part.length);
- int lenSoFar = 0;
- for (int i = 0; i <= level; i++)
- lenSoFar += lenAtLvl.get(i);
- if (lenSoFar > s.maxValueLength)
- s.maxValueLength = lenSoFar;
- }
- }, 0);
-
- // flatten trie footprint calculation, case of Multi-Byte-Per-DictNode
- s.mbpn_sizeId = 4;
- s.mbpn_sizeValueTotal = s.nValueBytesCompressed + s.nValues * s.mbpn_sizeId;
- s.mbpn_sizeNoValueBytes = 1;
- s.mbpn_sizeChildOffset = 5;
- s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset);
- while (true) { // minimize the offset size to match the footprint
- int t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset - 1);
- // *4 because 2 MSB of offset is used for isEndOfValue & isEndChild flag
- // expand t to long before *4, avoiding exceed Integer.MAX_VALUE
- if (BytesUtil.sizeForValue((long) t * 4) <= s.mbpn_sizeChildOffset - 1) {
- s.mbpn_sizeChildOffset--;
- s.mbpn_footprint = t;
- } else
- break;
- }
-
- return s;
- }
-
- /**
- * out print trie for debug
- */
- public void print(DictNode root) {
- print(root, System.out);
- }
-
- public void print(DictNode root, final PrintStream out) {
- traverseR(root, new Visitor() {
- @Override
- public void visit(DictNode n, int level) {
- try {
- for (int i = 0; i < level; i++)
- out.print(" ");
- out.print(new String(n.part, "UTF-8"));
- out.print(" - ");
- if (n.nValuesBeneath > 0)
- out.print(n.nValuesBeneath);
- if (n.isEndOfValue)
- out.print("* [" + n.id + "]");
- out.print("\n");
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- }
- }, 0);
- }
- }
-
- public static class Builder<T> {
- private static ConcurrentHashMap<String, Pair<Integer, Builder>> builderInstanceAndCountMap = new ConcurrentHashMap();
-
- public static Builder getInstance(String resourcePath) throws IOException {
- return getInstance(resourcePath, null);
- }
-
- public synchronized static Builder getInstance(String resourcePath, AppendTrieDictionary dict) throws IOException {
- Pair<Integer, Builder> entry = builderInstanceAndCountMap.get(resourcePath);
- if (entry == null) {
- entry = new Pair<>(0, createNewBuilder(resourcePath, dict));
- builderInstanceAndCountMap.put(resourcePath, entry);
- }
- entry.setFirst(entry.getFirst() + 1);
- return entry.getSecond();
- }
-
- // return true if entry still in map
- private synchronized static boolean releaseInstance(String resourcePath) {
- Pair<Integer, Builder> entry = builderInstanceAndCountMap.get(resourcePath);
- if (entry != null) {
- entry.setFirst(entry.getFirst() - 1);
- if (entry.getFirst() <= 0) {
- builderInstanceAndCountMap.remove(resourcePath);
- return false;
- }
- return true;
- }
- return false;
- }
-
- public static Builder createNewBuilder(String resourcePath, AppendTrieDictionary existDict) throws IOException {
- String dictDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + resourcePath + "/";
-
- AppendTrieDictionary dictToUse = existDict;
- if (dictToUse == null) {
- // Try to load the existing dict from cache, making sure there's only the same one object in memory
- NavigableSet<String> dicts = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().listResources(resourcePath);
- ArrayList<String> appendDicts = new ArrayList<>();
- if (dicts != null && !dicts.isEmpty()) {
- for (String dict : dicts) {
- DictionaryInfo info = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().getResource(dict, DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER);
- if (info.getDictionaryClass().equals(AppendTrieDictionary.class.getName())) {
- appendDicts.add(dict);
- }
- }
- }
- if (appendDicts.isEmpty()) {
- dictToUse = null;
- } else if (appendDicts.size() == 1) {
- dictToUse = (AppendTrieDictionary) DictionaryManager.getInstance(KylinConfig.getInstanceFromEnv()).getDictionary(appendDicts.get(0));
- } else {
- throw new IllegalStateException(String.format("GlobalDict %s should have 0 or 1 append dict but %d", resourcePath, appendDicts.size()));
- }
- }
-
- AppendTrieDictionary.Builder<String> builder;
- if (dictToUse == null) {
- logger.info("GlobalDict {} is empty, create new one", resourcePath);
- builder = new Builder<>(resourcePath, null, dictDir, 0, 0, 0, new StringBytesConverter(), null);
- } else {
- logger.info("GlobalDict {} exist, append value", resourcePath);
- builder = new Builder<>(resourcePath, dictToUse, dictToUse.baseDir, dictToUse.maxId, dictToUse.maxValueLength, dictToUse.nValues, dictToUse.bytesConvert, dictToUse.writeDictMap());
- }
-
- return builder;
- }
-
- private final String resourcePath;
- private final String baseDir;
- private int maxId;
- private int maxValueLength;
- private int nValues;
- private final BytesConverter<T> bytesConverter;
-
- private final AppendTrieDictionary dict;
-
- private final TreeMap<DictSliceKey, DictNode> mutableDictSliceMap;
- private int MAX_ENTRY_IN_SLICE = 10_000_000;
- private static final double MAX_ENTRY_OVERHEAD_FACTOR = 1.0;
-
- private int processedCount = 0;
-
- // Constructor for a new Dict
- private Builder(String resourcePath, AppendTrieDictionary dict, String baseDir, int maxId, int maxValueLength, int nValues, BytesConverter<T> bytesConverter, byte[] dictMapBytes) throws IOException {
- this.resourcePath = resourcePath;
- if (dict == null) {
- this.dict = new AppendTrieDictionary<T>();
- } else {
- this.dict = dict;
- }
- this.baseDir = baseDir;
- this.maxId = maxId;
- this.maxValueLength = maxValueLength;
- this.nValues = nValues;
- this.bytesConverter = bytesConverter;
-
- MAX_ENTRY_IN_SLICE = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
- int maxVersions = KylinConfig.getInstanceFromEnv().getAppendDictMaxVersions();
- long versionTTL = KylinConfig.getInstanceFromEnv().getAppendDictVersionTTL();
- // create a new cached map with baseDir
- mutableDictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(1).baseDir(baseDir).maxVersions(maxVersions).versionTTL(versionTTL).keyClazz(DictSliceKey.class).valueClazz(DictNode.class).immutable(false).build();
- if (dictMapBytes != null) {
- ((Writable) mutableDictSliceMap).readFields(new DataInputStream(new ByteArrayInputStream(dictMapBytes)));
- }
- }
-
- public void addValue(T value) {
- addValue(bytesConverter.convertToBytes(value));
- }
-
- private synchronized void addValue(byte[] value) {
- if (++processedCount % 1_000_000 == 0) {
- logger.debug("add value count " + processedCount);
- }
- maxValueLength = Math.max(maxValueLength, value.length);
-
- if (mutableDictSliceMap.isEmpty()) {
- DictNode root = new DictNode(new byte[0], false);
- mutableDictSliceMap.put(DictSliceKey.wrap(new byte[0]), root);
- }
- DictSliceKey sliceKey = mutableDictSliceMap.floorKey(DictSliceKey.wrap(value));
- if (sliceKey == null) {
- sliceKey = mutableDictSliceMap.firstKey();
- }
- DictNode root = mutableDictSliceMap.get(sliceKey);
- addValueR(root, value, 0);
- if (root.childrenCount > MAX_ENTRY_IN_SLICE * MAX_ENTRY_OVERHEAD_FACTOR) {
- mutableDictSliceMap.remove(sliceKey);
- DictNode newRoot = splitNodeTree(root);
- DictNode.mergeSingleByteNode(root, 1);
- DictNode.mergeSingleByteNode(newRoot, 0);
- mutableDictSliceMap.put(DictSliceKey.wrap(root.firstValue()), root);
- mutableDictSliceMap.put(DictSliceKey.wrap(newRoot.firstValue()), newRoot);
- }
- }
-
- private DictNode splitNodeTree(DictNode root) {
- DictNode parent = root;
- DictNode splitNode;
- int childCountToSplit = (int) (MAX_ENTRY_IN_SLICE * MAX_ENTRY_OVERHEAD_FACTOR / 2);
- while (true) {
- List<DictNode> children = parent.children;
- if (children.size() == 0) {
- splitNode = parent;
- break;
- } else if (children.size() == 1) {
- parent = children.get(0);
- continue;
- } else {
- for (int i = children.size() - 1; i >= 0; i--) {
- parent = children.get(i);
- if (childCountToSplit > children.get(i).childrenCount) {
- childCountToSplit -= children.get(i).childrenCount;
- } else {
- childCountToSplit--;
- break;
- }
- }
- }
- }
- return DictNode.splitNodeTree(splitNode);
- }
-
- private int createNextId() {
- int id = ++maxId;
- checkValidId(id);
- nValues++;
- return id;
- }
-
- // Only used for test
- public void setMaxId(int id) {
- this.maxId = id;
- }
-
- // When add a new node, the value part maybe over 255 bytes, need split it into a sub tree
- private DictNode addNodeMaybeOverflow(byte[] value, int start, int end) {
- DictNode head = null;
- DictNode current = null;
- for (; start + 255 < end; start += 255) {
- DictNode c = new DictNode(BytesUtil.subarray(value, start, start + 255), false);
- if (head == null) {
- head = c;
- current = c;
- } else {
- current.addChild(c);
- current = c;
- }
- }
- DictNode last = new DictNode(BytesUtil.subarray(value, start, end), true);
- last.id = createNextId();
- if (head == null) {
- head = last;
- } else {
- current.addChild(last);
- }
- return head;
- }
-
- private void addValueR(DictNode node, byte[] value, int start) {
- // match the value part of current node
- int i = 0, j = start;
- int n = node.part.length, nn = value.length;
- int comp = 0;
- for (; i < n && j < nn; i++, j++) {
- comp = BytesUtil.compareByteUnsigned(node.part[i], value[j]);
- if (comp != 0)
- break;
- }
-
- if (j == nn) {
- // if value fully matched within the current node
- if (i == n) {
- // if equals to current node, just mark end of value
- if (!node.isEndOfValue) {
- // if the first match, assign an Id to nodt
- node.id = createNextId();
- }
- node.isEndOfValue = true;
- } else {
- // otherwise, split the current node into two
- DictNode c = new DictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
- c.id = node.id;
- node.reset(BytesUtil.subarray(node.part, 0, i), true);
- node.addChild(c);
- node.id = createNextId();
- }
- return;
- }
-
- // if partially matched the current, split the current node, add the new
- // value, make a 3-way
- if (i < n) {
- DictNode c1 = new DictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
- c1.id = node.id;
- DictNode c2 = addNodeMaybeOverflow(value, j, nn);
- node.reset(BytesUtil.subarray(node.part, 0, i), false);
- if (comp < 0) {
- node.addChild(c1);
- node.addChild(c2);
- } else {
- node.addChild(c2);
- node.addChild(c1);
- }
- return;
- }
-
- // out matched the current, binary search the next byte for a child node
- // to continue
- byte lookfor = value[j];
- int lo = 0;
- int hi = node.children.size() - 1;
- int mid = 0;
- boolean found = false;
- comp = 0;
- while (!found && lo <= hi) {
- mid = lo + (hi - lo) / 2;
- DictNode c = node.children.get(mid);
- comp = BytesUtil.compareByteUnsigned(lookfor, c.part[0]);
- if (comp < 0)
- hi = mid - 1;
- else if (comp > 0)
- lo = mid + 1;
- else
- found = true;
- }
- if (found) {
- // found a child node matching the first byte, continue in that child
- addValueR(node.children.get(mid), value, j);
- } else {
- // otherwise, make the value a new child
- DictNode c = addNodeMaybeOverflow(value, j, nn);
- node.addChild(comp <= 0 ? mid : mid + 1, c);
- }
- }
-
- public synchronized AppendTrieDictionary<T> build(int baseId) throws IOException {
- boolean keepAppend = releaseInstance(resourcePath);
- CachedTreeMap dictSliceMap = (CachedTreeMap) mutableDictSliceMap;
- dict.initParams(baseDir, baseId, maxId, maxValueLength, nValues, bytesConverter);
- dict.flushIndex(dictSliceMap, keepAppend);
- dict.initDictSliceMap(dictSliceMap);
-
- return dict;
- }
+ final GlobalDictStore globalDictStore = new GlobalDictHDFSStore(baseDir);
+ Long[] versions = globalDictStore.listAllVersions();
+ checkState(versions.length > 0, "Global dict at %s is empty", baseDir);
+ final long latestVersion = versions[versions.length - 1];
+ final Path latestVersionPath = globalDictStore.getVersionDir(latestVersion);
+ this.metadata = globalDictStore.getMetadata(latestVersion);
+ this.bytesConvert = metadata.bytesConverter;
+ this.dictCache = CacheBuilder.newBuilder().softValues().removalListener(new RemovalListener<DictSliceKey, DictSlice>() {
+ @Override
+ public void onRemoval(RemovalNotification<DictSliceKey, DictSlice> notification) {
+ logger.info("Evict slice with key {} and value {} caused by {}, size {}/{}", notification.getKey(), notification.getValue(), notification.getCause(), dictCache.size(), metadata.sliceFileMap.size());
+ }
+ }).build(new CacheLoader<DictSliceKey, DictSlice>() {
+ @Override
+ public DictSlice load(DictSliceKey key) throws Exception {
+ DictSlice slice = globalDictStore.readSlice(latestVersionPath.toString(), metadata.sliceFileMap.get(key));
+ logger.info("Load slice with key {} and value {}", key, slice);
+ return slice;
+ }
+ });
}
@Override
protected int getIdFromValueBytesWithoutCache(byte[] value, int offset, int len, int roundingFlag) {
- if (dictSliceMap.isEmpty()) {
- return -1;
- }
- byte[] tempVal = new byte[len];
- System.arraycopy(value, offset, tempVal, 0, len);
- DictSliceKey sliceKey = dictSliceMap.floorKey(DictSliceKey.wrap(tempVal));
+ byte[] val = Arrays.copyOfRange(value, offset, offset + len);
+ DictSliceKey sliceKey = metadata.sliceFileMap.floorKey(DictSliceKey.wrap(val));
if (sliceKey == null) {
- sliceKey = dictSliceMap.firstKey();
+ sliceKey = metadata.sliceFileMap.firstKey();
}
- DictSlice slice = dictSliceMap.get(sliceKey);
- int id = slice.getIdFromValueBytesImpl(value, offset, len, roundingFlag);
- return id;
- }
-
- @Override
- protected byte[] getValueBytesFromIdWithoutCache(int id) {
- throw new UnsupportedOperationException("AppendTrieDictionary can't retrive value from id");
+ DictSlice slice;
+ try {
+ slice = dictCache.get(sliceKey);
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Failed to load slice with key " + sliceKey, e.getCause());
+ }
+ return slice.getIdFromValueBytesImpl(value, offset, len, roundingFlag);
}
@Override
public int getMinId() {
- return baseId;
+ return metadata.baseId;
}
@Override
public int getMaxId() {
- return maxId;
+ return metadata.maxId;
}
@Override
public int getSizeOfId() {
- return 4;
+ return Integer.SIZE / Byte.SIZE;
}
@Override
public int getSizeOfValue() {
- return maxValueLength;
+ return metadata.maxValueLength;
}
- public void flushIndex(CachedTreeMap dictSliceMap, boolean keepAppend) throws IOException {
- try (FSDataOutputStream indexOut = dictSliceMap.openIndexOutput()) {
- indexOut.writeInt(baseId);
- indexOut.writeInt(maxId);
- indexOut.writeInt(maxValueLength);
- indexOut.writeInt(nValues);
- indexOut.writeUTF(bytesConvert.getClass().getName());
- dictSliceMap.write(indexOut);
- dictSliceMap.commit(keepAppend);
- }
+ @Override
+ protected byte[] getValueBytesFromIdWithoutCache(int id) {
+ throw new UnsupportedOperationException("AppendTrieDictionary can't retrieve value from id");
}
@Override
public AppendTrieDictionary copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException {
- //copy appendDict
- Path base = new Path(baseDir);
- FileSystem srcFs = HadoopUtil.getFileSystem(base);
- Path srcPath = CachedTreeMap.getLatestVersion(HadoopUtil.getCurrentConfiguration(), srcFs, base);
- Path dstPath = new Path(srcPath.toString().replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()));
- logger.info("Copy appendDict from {} to {}", srcPath, dstPath);
-
- FileSystem dstFs = HadoopUtil.getFileSystem(dstPath);
- if (dstFs.exists(dstPath)) {
- logger.info("Delete existing AppendDict {}", dstPath);
- dstFs.delete(dstPath, true);
- }
- FileUtil.copy(srcFs, srcPath, dstFs, dstPath, false, true, HadoopUtil.getCurrentConfiguration());
-
- // init new AppendTrieDictionary
+ GlobalDictStore store = new GlobalDictHDFSStore(baseDir);
+ String dstBaseDir = store.copyToAnotherMeta(srcConfig, dstConfig);
AppendTrieDictionary newDict = new AppendTrieDictionary();
- newDict.initParams(baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()), baseId, maxId, maxValueLength, nValues, bytesConvert);
- newDict.initDictSliceMap((CachedTreeMap) dictSliceMap);
-
+ newDict.init(dstBaseDir);
return newDict;
}
@@ -1192,34 +144,12 @@ public class AppendTrieDictionary<T> extends CacheDictionary<T> {
@Override
public void readFields(DataInput in) throws IOException {
- String baseDir = in.readUTF();
- Configuration conf = HadoopUtil.getCurrentConfiguration();
- try (FSDataInputStream input = CachedTreeMap.openLatestIndexInput(conf, baseDir)) {
- int baseId = input.readInt();
- int maxId = input.readInt();
- int maxValueLength = input.readInt();
- int nValues = input.readInt();
- String converterName = input.readUTF();
- BytesConverter converter = null;
- if (converterName.isEmpty() == false) {
- try {
- converter = ClassUtil.forName(converterName, BytesConverter.class).newInstance();
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
- initParams(baseDir, baseId, maxId, maxValueLength, nValues, converter);
-
- // Create instance for deserialize data, and update to map in dict
- CachedTreeMap dictMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build();
- dictMap.readFields(input);
- initDictSliceMap(dictMap);
- }
+ init(in.readUTF());
}
@Override
public void dump(PrintStream out) {
- out.println("Total " + nValues + " values, " + (dictSliceMap == null ? 0 : dictSliceMap.size()) + " slice");
+ out.println(String.format("Total %d values and %d slices", metadata.nValues, metadata.sliceFileMap.size()));
}
@Override
@@ -1248,5 +178,4 @@ public class AppendTrieDictionary<T> extends CacheDictionary<T> {
public boolean contains(Dictionary other) {
return false;
}
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/bc6a1c3d/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
new file mode 100644
index 0000000..bfd664f
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import com.google.common.base.Preconditions;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.TreeMap;
+
+import static com.google.common.base.Preconditions.checkState;
+
+public class AppendTrieDictionaryBuilder {
+ private static final Logger logger = LoggerFactory.getLogger(AppendTrieDictionaryBuilder.class);
+
+ private final String baseDir;
+ private final String workingDir;
+ private final int maxEntriesPerSlice;
+
+ private GlobalDictStore store;
+ private int maxId;
+ private int maxValueLength;
+ private int nValues;
+ private BytesConverter bytesConverter;
+ private TreeMap<DictSliceKey, String> sliceFileMap = new TreeMap<>(); // slice key -> slice file name
+ private int counter;
+
+ private DictSliceKey curKey;
+ private DictNode curNode;
+
+ public AppendTrieDictionaryBuilder(String resourceDir, int maxEntriesPerSlice) throws IOException {
+ this.baseDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + resourceDir + "/";
+ this.workingDir = this.baseDir + "/working";
+ this.maxEntriesPerSlice = maxEntriesPerSlice;
+ init();
+ }
+
+ public synchronized void init() throws IOException {
+ this.store = new GlobalDictHDFSStore(baseDir);
+ store.prepareForWrite(workingDir);
+
+ Long[] versions = store.listAllVersions();
+
+ if (versions.length == 0) { // build dict for the first time
+ this.maxId = 0;
+ this.maxValueLength = 0;
+ this.nValues = 0;
+ this.bytesConverter = new StringBytesConverter();
+
+ } else { // append values to last version
+ GlobalDictMetadata metadata = store.getMetadata(versions[versions.length - 1]);
+ this.maxId = metadata.maxId;
+ this.maxValueLength = metadata.maxValueLength;
+ this.nValues = metadata.nValues;
+ this.bytesConverter = metadata.bytesConverter;
+ this.sliceFileMap = new TreeMap<>(metadata.sliceFileMap);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void addValue(String value) {
+ if (counter++ > 0 && counter % 1_000_000 == 0) {
+ logger.info("processed {} values", counter);
+ }
+
+ byte[] valueBytes = bytesConverter.convertToBytes(value);
+
+ if (sliceFileMap.isEmpty()) {
+ curNode = new DictNode(new byte[0], false);
+ sliceFileMap.put(DictSliceKey.START_KEY, null);
+ }
+ checkState(sliceFileMap.firstKey().equals(DictSliceKey.START_KEY), "first key should be \"\", but got \"%s\"", sliceFileMap.firstKey());
+
+ DictSliceKey nextKey = sliceFileMap.floorKey(DictSliceKey.wrap(valueBytes));
+
+ if (curKey != null && !nextKey.equals(curKey)) {
+ // you may suppose nextKey>=curKey, but nextKey<curKey could happen when a node splits.
+ // for example, suppose we have curNode [1-10], and add value "2" triggers split:
+ // first half [1-5] is flushed out, make second half [6-10] curNode and "6" curKey.
+ // then value "3" is added, now we got nextKey "1" smaller than curKey "6", surprise!
+ // in this case, we need to flush [6-10] and read [1-5] back.
+ flushCurrentNode();
+ curNode = null;
+ }
+ if (curNode == null) { // read next slice
+ DictSlice slice = store.readSlice(workingDir, sliceFileMap.get(nextKey));
+ curNode = slice.rebuildTrieTree();
+ }
+ curKey = nextKey;
+
+ addValueR(curNode, valueBytes, 0);
+
+ // split slice if it's too large
+ if (curNode.childrenCount > maxEntriesPerSlice) {
+ DictNode newRoot = splitNodeTree(curNode);
+ flushCurrentNode();
+ curNode = newRoot;
+ curKey = DictSliceKey.wrap(newRoot.firstValue());
+ sliceFileMap.put(curKey, null);
+ }
+
+ maxValueLength = Math.max(maxValueLength, valueBytes.length);
+ }
+
+ public AppendTrieDictionary build(int baseId) throws IOException {
+ if (curNode != null) {
+ flushCurrentNode();
+ }
+
+ GlobalDictMetadata metadata = new GlobalDictMetadata(baseId, this.maxId, this.maxValueLength, this.nValues, this.bytesConverter, sliceFileMap);
+ store.commit(workingDir, metadata);
+
+ AppendTrieDictionary dict = new AppendTrieDictionary();
+ dict.init(this.baseDir);
+ return dict;
+ }
+
+ private void flushCurrentNode() {
+ String newSliceFile = store.writeSlice(workingDir, curKey, curNode);
+ String oldSliceFile = sliceFileMap.put(curKey, newSliceFile);
+ if (oldSliceFile != null) {
+ store.deleteSlice(workingDir, oldSliceFile);
+ }
+ }
+
+ private void addValueR(DictNode node, byte[] value, int start) {
+ // match the value part of current node
+ int i = 0, j = start;
+ int n = node.part.length, nn = value.length;
+ int comp = 0;
+ for (; i < n && j < nn; i++, j++) {
+ comp = BytesUtil.compareByteUnsigned(node.part[i], value[j]);
+ if (comp != 0)
+ break;
+ }
+
+ if (j == nn) {
+ // if value fully matched within the current node
+ if (i == n) {
+ // on first match, mark end of value and assign an ID
+ if (!node.isEndOfValue) {
+ node.id = createNextId();
+ node.isEndOfValue = true;
+ }
+ } else {
+ // otherwise, split the current node into two
+ DictNode c = new DictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
+ c.id = node.id;
+ node.reset(BytesUtil.subarray(node.part, 0, i), true);
+ node.addChild(c);
+ node.id = createNextId();
+ }
+ return;
+ }
+
+ // if partially matched the current, split the current node, add the new
+ // value, make a 3-way
+ if (i < n) {
+ DictNode c1 = new DictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
+ c1.id = node.id;
+ DictNode c2 = addNodeMaybeOverflow(value, j, nn);
+ node.reset(BytesUtil.subarray(node.part, 0, i), false);
+ if (comp < 0) {
+ node.addChild(c1);
+ node.addChild(c2);
+ } else {
+ node.addChild(c2);
+ node.addChild(c1);
+ }
+ return;
+ }
+
+ // out matched the current, binary search the next byte for a child node
+ // to continue
+ byte lookfor = value[j];
+ int lo = 0;
+ int hi = node.children.size() - 1;
+ int mid = 0;
+ boolean found = false;
+ comp = 0;
+ while (!found && lo <= hi) {
+ mid = lo + (hi - lo) / 2;
+ DictNode c = node.children.get(mid);
+ comp = BytesUtil.compareByteUnsigned(lookfor, c.part[0]);
+ if (comp < 0)
+ hi = mid - 1;
+ else if (comp > 0)
+ lo = mid + 1;
+ else
+ found = true;
+ }
+ if (found) {
+ // found a child node matching the first byte, continue in that child
+ addValueR(node.children.get(mid), value, j);
+ } else {
+ // otherwise, make the value a new child
+ DictNode c = addNodeMaybeOverflow(value, j, nn);
+ node.addChild(comp <= 0 ? mid : mid + 1, c);
+ }
+ }
+
+ private int createNextId() {
+ int id = ++maxId;
+ checkValidId(id);
+ nValues++;
+ return id;
+ }
+
+ // The dict id starts from 1 to 2147483647 and 2147483648 to -2, leave 0 and -1 used for uninitialized state
+ private void checkValidId(int id) {
+ if (id == 0 || id == -1) {
+ throw new IllegalArgumentException("AppendTrieDictionary Id Overflow Unsigned Integer Size 4294967294");
+ }
+ }
+
+ // When add a new node, the value part maybe over 255 bytes, need split it into a sub tree
+ private DictNode addNodeMaybeOverflow(byte[] value, int start, int end) {
+ DictNode head = null;
+ DictNode current = null;
+ for (; start + 255 < end; start += 255) {
+ DictNode c = new DictNode(BytesUtil.subarray(value, start, start + 255), false);
+ if (head == null) {
+ head = c;
+ current = c;
+ } else {
+ current.addChild(c);
+ current = c;
+ }
+ }
+ DictNode last = new DictNode(BytesUtil.subarray(value, start, end), true);
+ last.id = createNextId();
+ if (head == null) {
+ head = last;
+ } else {
+ current.addChild(last);
+ }
+ return head;
+ }
+
+ private DictNode splitNodeTree(DictNode root) {
+ DictNode parent = root;
+ int childCountToSplit = (int) (maxEntriesPerSlice * 1.0 / 2);
+ while (true) {
+ List<DictNode> children = parent.children;
+ if (children.size() == 0) {
+ break;
+ }
+ if (children.size() == 1) {
+ parent = children.get(0);
+ } else {
+ for (int i = children.size() - 1; i >= 0; i--) {
+ parent = children.get(i);
+ if (childCountToSplit > children.get(i).childrenCount) {
+ childCountToSplit -= children.get(i).childrenCount;
+ } else {
+ childCountToSplit--;
+ break;
+ }
+ }
+ }
+ }
+ return DictNode.splitNodeTree(parent);
+ }
+
+ // Only used for test
+ void setMaxId(int id) {
+ this.maxId = id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/bc6a1c3d/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java
index 4b3817a..b7c39fa 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
+import static org.apache.kylin.dict.GlobalDictHDFSStore.BUFFER_SIZE;
+
/**
* Created by sunyerui on 16/11/15.
*/
@@ -67,16 +69,15 @@ public class AppendTrieDictionaryChecker {
listDictSlicePath(fs, status, list);
}
} else {
- if (path.getPath().getName().startsWith(CachedTreeMap.CACHED_PREFIX)) {
+ if (path.getPath().getName().startsWith(GlobalDictHDFSStore.IndexFormatV1.SLICE_PREFIX)) {
list.add(path.getPath());
}
}
}
public boolean doCheck(FileSystem fs, Path filePath) {
- try (FSDataInputStream input = fs.open(filePath, CachedTreeMap.BUFFER_SIZE)) {
- AppendTrieDictionary.DictSlice slice = new AppendTrieDictionary.DictSlice();
- slice.readFields(input);
+ try (FSDataInputStream input = fs.open(filePath, BUFFER_SIZE)) {
+ DictSlice slice = DictSlice.deserializeFrom(input);
return slice.doCheck();
} catch (Exception e) {
return false;
http://git-wip-us.apache.org/repos/asf/kylin/blob/bc6a1c3d/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java b/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
deleted file mode 100644
index ee69df7..0000000
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
+++ /dev/null
@@ -1,481 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.AbstractCollection;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-
-/**
- * Created by sunyerui on 16/5/2.
- * TODO Depends on HDFS for now, ideally just depends on storage interface
- */
-public class CachedTreeMap<K extends WritableComparable, V extends Writable> extends TreeMap<K, V> implements Writable {
- private static final Logger logger = LoggerFactory.getLogger(CachedTreeMap.class);
-
- private final Class<K> keyClazz;
- private final Class<V> valueClazz;
- transient volatile Collection<V> values;
- private final LoadingCache<K, V> valueCache;
- private final Configuration conf;
- private final Path baseDir;
- private final Path versionDir;
- private final Path workingDir;
- private final FileSystem fs;
- private final boolean immutable;
- private final int maxVersions;
- private final long versionTTL;
- private boolean keepAppend;
-
- public static final int BUFFER_SIZE = 8 * 1024 * 1024;
-
- public static final String CACHED_PREFIX = "cached_";
- public static final String VERSION_PREFIX = "version_";
-
- public static class CachedTreeMapBuilder<K, V> {
- private Class<K> keyClazz;
- private Class<V> valueClazz;
- private int maxCount = 8;
- private String baseDir;
- private boolean immutable;
- private int maxVersions;
- private long versionTTL;
-
- public static CachedTreeMapBuilder newBuilder() {
- return new CachedTreeMapBuilder();
- }
-
- private CachedTreeMapBuilder() {
- }
-
- public CachedTreeMapBuilder keyClazz(Class<K> clazz) {
- this.keyClazz = clazz;
- return this;
- }
-
- public CachedTreeMapBuilder valueClazz(Class<V> clazz) {
- this.valueClazz = clazz;
- return this;
- }
-
- public CachedTreeMapBuilder<K, V> maxSize(int maxCount) {
- this.maxCount = maxCount;
- return this;
- }
-
- public CachedTreeMapBuilder<K, V> baseDir(String baseDir) {
- this.baseDir = baseDir;
- return this;
- }
-
- public CachedTreeMapBuilder<K, V> immutable(boolean immutable) {
- this.immutable = immutable;
- return this;
- }
-
- public CachedTreeMapBuilder<K, V> maxVersions(int maxVersions) {
- this.maxVersions = maxVersions;
- return this;
- }
-
- public CachedTreeMapBuilder<K, V> versionTTL(long versionTTL) {
- this.versionTTL = versionTTL;
- return this;
- }
-
- public CachedTreeMap build() throws IOException {
- if (baseDir == null) {
- throw new RuntimeException("CachedTreeMap need a baseDir to cache data");
- }
- if (keyClazz == null || valueClazz == null) {
- throw new RuntimeException("CachedTreeMap need key and value clazz to serialize data");
- }
- CachedTreeMap map = new CachedTreeMap(maxCount, keyClazz, valueClazz, baseDir, immutable, maxVersions, versionTTL);
- return map;
- }
- }
-
- private CachedTreeMap(int maxCount, Class<K> keyClazz, Class<V> valueClazz, String basePath,
- boolean immutable, int maxVersions, long versionTTL) throws IOException {
- super();
- this.keyClazz = keyClazz;
- this.valueClazz = valueClazz;
- this.immutable = immutable;
- this.keepAppend = true;
- this.maxVersions = maxVersions;
- this.versionTTL = versionTTL;
- this.conf = HadoopUtil.getCurrentConfiguration();
- if (basePath.endsWith("/")) {
- basePath = basePath.substring(0, basePath.length()-1);
- }
- this.baseDir = new Path(basePath);
- this.fs = HadoopUtil.getFileSystem(baseDir, conf);
- if (!fs.exists(baseDir)) {
- fs.mkdirs(baseDir);
- }
- this.versionDir = getLatestVersion(conf, fs, baseDir);
- this.workingDir = new Path(baseDir, "working");
- if (!this.immutable) {
- // For mutable map, copy all data into working dir and work on it, avoiding suddenly server crash made data corrupt
- if (fs.exists(workingDir)) {
- fs.delete(workingDir, true);
- }
- FileUtil.copy(fs, versionDir, fs, workingDir, false, true, conf);
- }
- CacheBuilder builder = CacheBuilder.newBuilder().removalListener(new RemovalListener<K, V>() {
- @Override
- public void onRemoval(RemovalNotification<K, V> notification) {
- logger.info(String.format("Evict cache key %s(%d) with value %s caused by %s, size %d/%d ", notification.getKey(), notification.getKey().hashCode(), notification.getValue(), notification.getCause(), size(), valueCache.size()));
- switch (notification.getCause()) {
- case SIZE:
- writeValue(notification.getKey(), notification.getValue());
- break;
- case EXPLICIT:
- deleteValue(notification.getKey());
- break;
- default:
- }
- }
- });
- if (this.immutable) {
- // For immutable values, load all values as much as possible, and evict by soft reference to free memory when gc
- builder.softValues();
- } else {
- builder.maximumSize(maxCount);
- }
- this.valueCache = builder.build(new CacheLoader<K, V>() {
- @Override
- public V load(K key) throws Exception {
- V value = readValue(key);
- logger.info(String.format("Load cache by key %s(%d) with value %s", key, key.hashCode(), value));
- return value;
- }
- });
- }
-
- private String generateFileName(K key) {
- String file = getCurrentDir() + "/" + CACHED_PREFIX + key.toString();
- return file;
- }
-
- private String getCurrentDir() {
- return immutable ? versionDir.toString() : workingDir.toString();
- }
-
- private static String[] listAllVersions(FileSystem fs, Path baseDir) throws IOException {
- FileStatus[] fileStatus = fs.listStatus(baseDir, new PathFilter() {
- @Override
- public boolean accept(Path path) {
- if (path.getName().startsWith(VERSION_PREFIX)) {
- return true;
- }
- return false;
- }
- });
- TreeSet<String> versions = new TreeSet<>();
- for (FileStatus status : fileStatus) {
- versions.add(status.getPath().toString());
- }
- return versions.toArray(new String[versions.size()]);
- }
-
- // only for test
- public String getLatestVersion() throws IOException {
- return getLatestVersion(conf, fs, baseDir).toUri().getPath();
- }
-
- public static Path getLatestVersion(Configuration conf, FileSystem fs, Path baseDir) throws IOException {
- String[] versions = listAllVersions(fs, baseDir);
- if (versions.length > 0) {
- return new Path(versions[versions.length - 1]);
- } else {
- // Old format, directly use base dir, convert to new format
- Path newVersionDir = new Path(baseDir, VERSION_PREFIX + System.currentTimeMillis());
- Path tmpNewVersionDir = new Path(baseDir, "tmp_" + VERSION_PREFIX + System.currentTimeMillis());
- Path indexFile = new Path(baseDir, ".index");
- FileStatus[] cachedFiles;
- try {
- cachedFiles = fs.listStatus(baseDir, new PathFilter() {
- @Override
- public boolean accept(Path path) {
- if (path.getName().startsWith(CACHED_PREFIX)) {
- return true;
- }
- return false;
- }
- });
- fs.mkdirs(tmpNewVersionDir);
- if (fs.exists(indexFile) && cachedFiles.length > 0) {
- FileUtil.copy(fs, indexFile, fs, tmpNewVersionDir, false, true, conf);
- for (FileStatus file : cachedFiles) {
- FileUtil.copy(fs, file.getPath(), fs, tmpNewVersionDir, false, true, conf);
- }
- }
- fs.rename(tmpNewVersionDir, newVersionDir);
- if (fs.exists(indexFile) && cachedFiles.length > 0) {
- fs.delete(indexFile, true);
- for (FileStatus file : cachedFiles) {
- fs.delete(file.getPath(), true);
- }
- }
- } finally {
- if (fs.exists(tmpNewVersionDir)) {
- fs.delete(tmpNewVersionDir, true);
- }
- }
- return newVersionDir;
- }
- }
-
- public void commit(boolean keepAppend) throws IOException {
- assert this.keepAppend && !immutable : "Only support commit method with immutable false and keepAppend true";
-
- Path newVersionDir = new Path(baseDir, VERSION_PREFIX + System.currentTimeMillis());
- if (keepAppend) {
- // Copy to tmp dir, and rename to new version, make sure it's complete when be visible
- Path tmpNewVersionDir = new Path(baseDir, "tmp_" + VERSION_PREFIX + System.currentTimeMillis());
- try {
- FileUtil.copy(fs, workingDir, fs, tmpNewVersionDir, false, true, conf);
- fs.rename(tmpNewVersionDir, newVersionDir);
- } finally {
- if (fs.exists(tmpNewVersionDir)) {
- fs.delete(tmpNewVersionDir, true);
- }
- }
- } else {
- fs.rename(workingDir, newVersionDir);
- }
- this.keepAppend = keepAppend;
-
- // Check versions count, delete expired versions
- String[] versions = listAllVersions(fs, baseDir);
- long timestamp = System.currentTimeMillis();
- for (int i = 0; i < versions.length - maxVersions; i++) {
- String versionString = versions[i].substring(versions[i].lastIndexOf(VERSION_PREFIX) + VERSION_PREFIX.length());
- long version = Long.parseLong(versionString);
- if (version + versionTTL < timestamp) {
- fs.delete(new Path(versions[i]), true);
- }
- }
- }
-
- public void loadEntry(CachedTreeMap other) {
- for (Object key : other.keySet()) {
- super.put((K)key, null);
- }
- }
-
- private void writeValue(K key, V value) {
- if (immutable) {
- return;
- }
- String fileName = generateFileName(key);
- Path filePath = new Path(fileName);
- try (FSDataOutputStream out = fs.create(filePath, true, BUFFER_SIZE, (short) 5, BUFFER_SIZE * 8L)) {
- value.write(out);
- } catch (Exception e) {
- logger.error(String.format("write value into %s exception: %s", fileName, e), e);
- throw new RuntimeException(e.getCause());
- }
- }
-
- private V readValue(K key) throws Exception {
- String fileName = generateFileName(key);
- Path filePath = new Path(fileName);
- try (FSDataInputStream input = fs.open(filePath, BUFFER_SIZE)) {
- V value = valueClazz.newInstance();
- value.readFields(input);
- return value;
- } catch (Exception e) {
- logger.error(String.format("read value from %s exception: %s", fileName, e), e);
- return null;
- }
- }
-
- private void deleteValue(K key) {
- if (immutable) {
- return;
- }
- String fileName = generateFileName(key);
- Path filePath = new Path(fileName);
- try {
- if (fs.exists(filePath)) {
- fs.delete(filePath, true);
- }
- } catch (Exception e) {
- logger.error(String.format("delete value file %s exception: %s", fileName, e), e);
- }
- }
-
- @Override
- public V put(K key, V value) {
- assert keepAppend && !immutable : "Only support put method with immutable false and keepAppend true";
- super.put(key, null);
- valueCache.put(key, value);
- return null;
- }
-
- @Override
- public V get(Object key) {
- if (super.containsKey(key)) {
- try {
- return valueCache.get((K) key);
- } catch (ExecutionException e) {
- logger.error(String.format("get value with key %s exception: %s", key, e), e);
- return null;
- }
- } else {
- return null;
- }
- }
-
- @Override
- public V remove(Object key) {
- assert keepAppend && !immutable : "Only support remove method with immutable false keepAppend true";
- super.remove(key);
- valueCache.invalidate(key);
- return null;
- }
-
- @Override
- public void clear() {
- super.clear();
- values = null;
- valueCache.invalidateAll();
- }
-
- public Collection<V> values() {
- Collection<V> vs = values;
- return (vs != null) ? vs : (values = new Values());
- }
-
- class Values extends AbstractCollection<V> {
- @Override
- public Iterator<V> iterator() {
- return new ValueIterator<>();
- }
-
- @Override
- public int size() {
- return CachedTreeMap.this.size();
- }
- }
-
- class ValueIterator<V> implements Iterator<V> {
- Iterator<K> keyIterator;
- K currentKey;
-
- public ValueIterator() {
- keyIterator = CachedTreeMap.this.keySet().iterator();
- }
-
- @Override
- public boolean hasNext() {
- return keyIterator.hasNext();
- }
-
- @Override
- public V next() {
- currentKey = keyIterator.next();
- try {
- return (V) valueCache.get(currentKey);
- } catch (ExecutionException e) {
- logger.error(String.format("get value with key %s exception: %s", currentKey, e), e);
- return null;
- }
- }
-
- @Override
- public void remove() {
- assert keepAppend && !immutable : "Only support remove method with immutable false and keepAppend true";
- keyIterator.remove();
- valueCache.invalidate(currentKey);
- }
- }
-
- public FSDataOutputStream openIndexOutput() throws IOException {
- assert keepAppend && !immutable : "Only support write method with immutable false and keepAppend true";
- Path indexPath = new Path(getCurrentDir(), ".index");
- return fs.create(indexPath, true, 8 * 1024 * 1024, (short) 5, 8 * 1024 * 1024 * 8);
- }
-
- public FSDataInputStream openIndexInput() throws IOException {
- Path indexPath = new Path(getCurrentDir(), ".index");
- return fs.open(indexPath, 8 * 1024 * 1024);
- }
-
- public static FSDataInputStream openLatestIndexInput(Configuration conf, String baseDir) throws IOException {
- Path basePath = new Path(baseDir);
- FileSystem fs = HadoopUtil.getFileSystem(basePath, conf);
- Path indexPath = new Path(getLatestVersion(conf, fs, basePath), ".index");
- return fs.open(indexPath, 8 * 1024 * 1024);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(size());
- for (K key : keySet()) {
- key.write(out);
- V value = valueCache.getIfPresent(key);
- if (null != value) {
- writeValue(key, value);
- }
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- int size = in.readInt();
- try {
- for (int i = 0; i < size; i++) {
- K key = keyClazz.newInstance();
- key.readFields(in);
- super.put(key, null);
- }
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-}
[5/7] kylin git commit: KYLIN-2506 Refactor
ZookeeperDistributedJobLock
Posted by li...@apache.org.
KYLIN-2506 Refactor ZookeeperDistributedJobLock
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1a8e2573
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1a8e2573
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1a8e2573
Branch: refs/heads/master-KYLIN-2506
Commit: 1a8e25733c64fd6eb5580cdf898e2baf516a776d
Parents: bc6a1c3
Author: kangkaisen <ka...@163.com>
Authored: Fri Apr 7 15:45:43 2017 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun Apr 16 17:12:50 2017 +0800
----------------------------------------------------------------------
core-common/pom.xml | 5 +
.../kylin/common/lock/DistributedJobLock.java | 38 +++++
.../org/apache/kylin/common/lock/JobLock.java | 26 +++
.../apache/kylin/common/lock/MockJobLock.java | 33 ++++
.../kylin/dict/AppendTrieDictionaryBuilder.java | 1 -
.../java/org/apache/kylin/job/Scheduler.java | 2 +-
.../job/impl/threadpool/DefaultScheduler.java | 2 +-
.../impl/threadpool/DistributedScheduler.java | 43 +++--
.../kylin/job/lock/DistributedJobLock.java | 36 ----
.../java/org/apache/kylin/job/lock/JobLock.java | 27 ---
.../org/apache/kylin/job/lock/MockJobLock.java | 33 ----
.../job/impl/threadpool/BaseSchedulerTest.java | 2 +-
.../test_case_data/localmeta/kylin.properties | 4 +-
.../test_case_data/sandbox/kylin.properties | 2 +-
.../kylin/job/BaseTestDistributedScheduler.java | 4 +-
.../apache/kylin/rest/service/JobService.java | 2 +-
.../hbase/util/ZookeeperDistributedJobLock.java | 164 +++++++++----------
.../storage/hbase/util/ZookeeperJobLock.java | 2 +-
18 files changed, 219 insertions(+), 207 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/1a8e2573/core-common/pom.xml
----------------------------------------------------------------------
diff --git a/core-common/pom.xml b/core-common/pom.xml
index 95d3c29..5b5f78b 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -69,6 +69,11 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/kylin/blob/1a8e2573/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java
new file mode 100644
index 0000000..00d1ca4
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.lock;
+
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+
+import java.util.concurrent.Executor;
+
+public interface DistributedJobLock extends JobLock {
+
+ boolean lockWithClient(String lockPath, String lockClient);
+
+ boolean isHasLocked(String lockPath);
+
+ void unlock(String lockPath);
+
+ PathChildrenCache watch(String watchPath, Executor watchExecutor, WatcherProcess process);
+
+ public interface WatcherProcess {
+ void process(String path, String data);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1a8e2573/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
new file mode 100644
index 0000000..5802d71
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.lock;
+
+
+public interface JobLock {
+ boolean lock();
+
+ void unlock();
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1a8e2573/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
new file mode 100644
index 0000000..f8233be
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.lock;
+
+/**
+ */
+public class MockJobLock implements JobLock {
+ @Override
+ public boolean lock() {
+ return true;
+ }
+
+ @Override
+ public void unlock() {
+ return;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1a8e2573/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
index bfd664f..c35a815 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
@@ -18,7 +18,6 @@
package org.apache.kylin.dict;
-import com.google.common.base.Preconditions;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.BytesUtil;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/kylin/blob/1a8e2573/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
index 93d2510..e2cfd44 100644
--- a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
@@ -21,7 +21,7 @@ package org.apache.kylin.job;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.exception.SchedulerException;
import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.common.lock.JobLock;
/**
*/
http://git-wip-us.apache.org/repos/asf/kylin/blob/1a8e2573/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 403abc4..688708e 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -40,7 +40,7 @@ import org.apache.kylin.job.execution.Executable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.common.lock.JobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/1a8e2573/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 1f2e958..b99da7c 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -18,6 +18,7 @@
package org.apache.kylin.job.impl.threadpool;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
@@ -33,6 +34,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.kylin.common.KylinConfig;
@@ -48,8 +50,8 @@ import org.apache.kylin.job.execution.Executable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.lock.DistributedJobLock;
-import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.common.lock.DistributedJobLock;
+import org.apache.kylin.common.lock.JobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,6 +73,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
private ExecutorService jobPool;
private DefaultContext context;
private DistributedJobLock jobLock;
+ private PathChildrenCache lockWatch;
private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class);
private static final ConcurrentMap<KylinConfig, DistributedScheduler> CACHE = new ConcurrentHashMap<KylinConfig, DistributedScheduler>();
@@ -81,6 +84,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
private JobEngineConfig jobEngineConfig;
private final static String SEGMENT_ID = "segmentId";
+ public static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
//only for it test
public static DistributedScheduler getInstance(KylinConfig config) {
@@ -177,7 +181,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
public void run() {
try (SetThreadName ignored = new SetThreadName("Job %s", executable.getId())) {
String segmentId = executable.getParam(SEGMENT_ID);
- if (jobLock.lockWithName(segmentId, serverName)) {
+ if (jobLock.lockWithClient(getLockPath(segmentId), serverName)) {
logger.info(executable.toString() + " scheduled in server: " + serverName);
context.addRunningJob(executable);
@@ -205,7 +209,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
if (state != ExecutableState.READY && state != ExecutableState.RUNNING) {
if (segmentWithLocks.contains(segmentId)) {
logger.info(executable.toString() + " will release the lock for the segment: " + segmentId);
- jobLock.unlockWithName(segmentId);
+ jobLock.unlock(getLockPath(segmentId));
segmentWithLocks.remove(segmentId);
}
}
@@ -214,15 +218,15 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
}
//when the segment lock released but the segment related job still running, resume the job.
- private class DoWatchImpl implements org.apache.kylin.job.lock.DistributedJobLock.DoWatchLock {
+ private class WatcherProcessImpl implements org.apache.kylin.common.lock.DistributedJobLock.WatcherProcess {
private String serverName;
- public DoWatchImpl(String serverName) {
+ public WatcherProcessImpl(String serverName) {
this.serverName = serverName;
}
@Override
- public void doWatch(String path, String nodeData) {
+ public void process(String path, String nodeData) {
String[] paths = path.split("/");
String segmentId = paths[paths.length - 1];
@@ -233,7 +237,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
if (executable instanceof DefaultChainedExecutable && executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId) && !nodeData.equalsIgnoreCase(serverName)) {
try {
logger.warn(nodeData + " has released the lock for: " + segmentId + " but the job still running. so " + serverName + " resume the job");
- if (!jobLock.isHasLocked(segmentId)) {
+ if (!jobLock.isHasLocked(getLockPath(segmentId))) {
executableManager.resumeRunningJobForce(executable.getId());
fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
break;
@@ -283,8 +287,8 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
//watch the zookeeper node change, so that when one job server is down, other job servers can take over.
watchPool = Executors.newFixedThreadPool(1);
- DoWatchImpl doWatchImpl = new DoWatchImpl(this.serverName);
- this.jobLock.watchLock(watchPool, doWatchImpl);
+ WatcherProcessImpl watcherProcess = new WatcherProcessImpl(this.serverName);
+ lockWatch = this.jobLock.watch(getWatchPath(), watchPool, watcherProcess);
int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>());
@@ -314,16 +318,27 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
}
}
+ private String getLockPath(String pathName) {
+ return ZOOKEEPER_LOCK_PATH + "/" + jobEngineConfig.getConfig().getMetadataUrlPrefix() + "/" + pathName;
+ }
+
+ private String getWatchPath() {
+ return ZOOKEEPER_LOCK_PATH + "/" + jobEngineConfig.getConfig().getMetadataUrlPrefix();
+ }
+
@Override
public void shutdown() throws SchedulerException {
logger.info("Will shut down Job Engine ....");
+ try {
+ lockWatch.close();
+ } catch (IOException e) {
+ throw new SchedulerException(e);
+ }
+
releaseAllLocks();
logger.info("The all locks has released");
- watchPool.shutdown();
- logger.info("The watchPool has down");
-
fetcherPool.shutdown();
logger.info("The fetcherPool has down");
@@ -333,7 +348,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
private void releaseAllLocks() {
for (String segmentId : segmentWithLocks) {
- jobLock.unlockWithName(segmentId);
+ jobLock.unlock(getLockPath(segmentId));
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1a8e2573/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
deleted file mode 100644
index 1c173ec..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.lock;
-
-import java.util.concurrent.ExecutorService;
-
-public interface DistributedJobLock extends JobLock {
-
- boolean lockWithName(String name, String serverName);
-
- boolean isHasLocked(String segmentId);
-
- void unlockWithName(String name);
-
- void watchLock(ExecutorService pool, DoWatchLock doWatch);
-
- public interface DoWatchLock {
- void doWatch(String path, String data);
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1a8e2573/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
deleted file mode 100644
index bbfb801..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.lock;
-
-/**
- */
-public interface JobLock {
- boolean lock();
-
- void unlock();
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1a8e2573/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
deleted file mode 100644
index cac17b9..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.lock;
-
-/**
- */
-public class MockJobLock implements JobLock {
- @Override
- public boolean lock() {
- return true;
- }
-
- @Override
- public void unlock() {
- return;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1a8e2573/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index 1ada9a1..1bafa34 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -28,7 +28,7 @@ import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.lock.MockJobLock;
+import org.apache.kylin.common.lock.MockJobLock;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/kylin/blob/1a8e2573/examples/test_case_data/localmeta/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties
index 9f7b24c..3866575 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -133,7 +133,7 @@ kylin.security.saml.context-path=/kylin
kylin.test.bcc.new.key=some-value
kylin.engine.mr.config-override.test1=test1
kylin.engine.mr.config-override.test2=test2
-kylin.job.lock=org.apache.kylin.job.lock.MockJobLock
+kylin.job.lock=org.apache.kylin.common.lock.MockJobLock
kylin.engine.provider.0=org.apache.kylin.engine.mr.MRBatchCubingEngine
-kylin.engine.provider.2=org.apache.kylin.engine.mr.MRBatchCubingEngine2
+kylin.engine.provider.2=org.apache.kylin.engine.mr.MRBatchCubingEngine2
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/1a8e2573/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index 684b4dd..c0a4968 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -112,7 +112,7 @@ kylin.query.udf.concat=org.apache.kylin.query.udf.ConcatUDF
kylin.query.udf.version=org.apache.kylin.query.udf.VersionUDF
# for test
-kylin.job.lock=org.apache.kylin.job.lock.MockJobLock
+kylin.job.lock=org.apache.kylin.common.lock.MockJobLock
kylin.engine.mr.uhc-reducer-count=3
### CUBE ###
http://git-wip-us.apache.org/repos/asf/kylin/blob/1a8e2573/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
index 2d79970..4877ca1 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
@@ -167,7 +167,7 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
}
boolean lock(ZookeeperDistributedJobLock jobLock, String cubeName, String serverName) {
- return jobLock.lockWithName(cubeName, serverName);
+ return jobLock.lockWithClient(getLockPath(cubeName), serverName);
}
private static void initZk() {
@@ -197,6 +197,6 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
}
private String getLockPath(String pathName) {
- return ZookeeperDistributedJobLock.ZOOKEEPER_LOCK_PATH + "/" + kylinConfig1.getMetadataUrlPrefix() + "/" + pathName;
+ return DistributedScheduler.ZOOKEEPER_LOCK_PATH + "/" + kylinConfig1.getMetadataUrlPrefix() + "/" + pathName;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1a8e2573/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 4ba426e..31d1ded 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -51,7 +51,7 @@ import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.common.lock.JobLock;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.rest.constant.Constant;
http://git-wip-us.apache.org/repos/asf/kylin/blob/1a8e2573/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
index 983bfd9..5f5a721 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
@@ -19,35 +19,30 @@
package org.apache.kylin.storage.hbase.util;
import java.nio.charset.Charset;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.lock.DistributedJobLock;
+import org.apache.kylin.common.lock.DistributedJobLock;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * the jobLock is specially used to support distributed scheduler.
- */
-
public class ZookeeperDistributedJobLock implements DistributedJobLock {
private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class);
- public static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
-
- final private KylinConfig config;
- final CuratorFramework zkClient;
- final PathChildrenCache childrenCache;
+ private final KylinConfig config;
+ private static final ConcurrentMap<KylinConfig, CuratorFramework> CACHE = new ConcurrentHashMap<KylinConfig, CuratorFramework>();
+ private final CuratorFramework zkClient;
public ZookeeperDistributedJobLock() {
this(KylinConfig.getInstanceFromEnv());
@@ -57,16 +52,12 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
this.config = config;
String zkConnectString = ZookeeperUtil.getZKConnectString();
- logger.info("zk connection string:" + zkConnectString);
if (StringUtils.isEmpty(zkConnectString)) {
throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
}
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- zkClient = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy);
- zkClient.start();
+ zkClient = getZKClient(config, zkConnectString);
- childrenCache = new PathChildrenCache(zkClient, getWatchPath(), true);
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
@@ -75,97 +66,104 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
}));
}
+ //make the zkClient to be singleton
+ private static CuratorFramework getZKClient(KylinConfig config, String zkConnectString) {
+ CuratorFramework zkClient = CACHE.get(config);
+ if (zkClient == null) {
+ synchronized (ZookeeperDistributedJobLock.class) {
+ zkClient = CACHE.get(config);
+ if (zkClient == null) {
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ zkClient = CuratorFrameworkFactory.newClient(zkConnectString, 120000, 15000, retryPolicy);
+ zkClient.start();
+ CACHE.put(config, zkClient);
+ if (CACHE.size() > 1) {
+ logger.warn("More than one singleton exist");
+ }
+ }
+ }
+ }
+ return zkClient;
+ }
+
/**
- * Lock the segment with the segmentId and serverName.
- *
- * <p> if the segment related job want to be scheduled,
- * it must acquire the segment lock. segmentId is used to get the lock path,
- * serverName marked which job server keep the segment lock.
+ * Try locking the path with the lockPath and lockClient, if lock successfully,
+ * the lockClient will write into the data of lockPath.
*
- * @param segmentId the id of segment need to lock
+ * @param lockPath the path will create in zookeeper
*
- * @param serverName the hostname of job server
+ * @param lockClient the mark of client
*
- * @return <tt>true</tt> if the segment locked successfully
+ * @return <tt>true</tt> if lock successfully or the lockClient has kept the lock
*
* @since 2.0
*/
@Override
- public boolean lockWithName(String segmentId, String serverName) {
- String lockPath = getLockPath(segmentId);
- logger.info(serverName + " start lock the segment: " + segmentId);
+ public boolean lockWithClient(String lockPath, String lockClient) {
+ logger.info(lockClient + " start lock the path: " + lockPath);
boolean hasLock = false;
try {
- if (!(zkClient.getState().equals(CuratorFrameworkState.STARTED))) {
- logger.error("zookeeper have not start");
- return false;
- }
if (zkClient.checkExists().forPath(lockPath) != null) {
- if (isKeepLock(serverName, lockPath)) {
+ if (isKeepLock(lockClient, lockPath)) {
hasLock = true;
- logger.info(serverName + " has kept the lock for segment: " + segmentId);
+ logger.info(lockClient + " has kept the lock for the path: " + lockPath);
}
} else {
- zkClient.create().withMode(CreateMode.EPHEMERAL).forPath(lockPath, serverName.getBytes(Charset.forName("UTF-8")));
- if (isKeepLock(serverName, lockPath)) {
+ zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(lockPath, lockClient.getBytes(Charset.forName("UTF-8")));
+ if (isKeepLock(lockClient, lockPath)) {
hasLock = true;
- logger.info(serverName + " lock the segment: " + segmentId + " successfully");
+ logger.info(lockClient + " lock the path: " + lockPath + " successfully");
}
}
} catch (Exception e) {
- logger.error(serverName + " error acquire lock for the segment: " + segmentId, e);
- }
- if (!hasLock) {
- logger.info(serverName + " fail to acquire lock for the segment: " + segmentId);
- return false;
+ logger.error(lockClient + " error acquire lock for the path: " + lockPath, e);
}
- return true;
+ return hasLock;
}
/**
*
- * Returns <tt>true</tt> if, the job server is keeping the lock for the lockPath
+ * Returns <tt>true</tt> if, the lockClient is keeping the lock for the lockPath
*
- * @param serverName the hostname of job server
+ * @param lockClient the mark of client
*
- * @param lockPath the zookeeper node path of segment
+ * @param lockPath the zookeeper node path for the lock
*
- * @return <tt>true</tt> if the job server is keeping the lock for the lockPath, otherwise
+ * @return <tt>true</tt> if the lockClient is keeping the lock for the lockPath, otherwise
* <tt>false</tt>
*
* @since 2.0
*/
- private boolean isKeepLock(String serverName, String lockPath) {
+ private boolean isKeepLock(String lockClient, String lockPath) {
try {
if (zkClient.checkExists().forPath(lockPath) != null) {
byte[] data = zkClient.getData().forPath(lockPath);
String lockServerName = new String(data, Charset.forName("UTF-8"));
- return lockServerName.equalsIgnoreCase(serverName);
+ return lockServerName.equalsIgnoreCase(lockClient);
}
} catch (Exception e) {
- logger.error("fail to get the serverName for the path: " + lockPath, e);
+ logger.error("fail to get the lockClient for the path: " + lockPath, e);
}
return false;
}
/**
*
- * Returns <tt>true</tt> if, and only if, the segment has been locked.
+ * Returns <tt>true</tt> if, and only if, the path has been locked.
*
- * @param segmentId the id of segment need to release the lock.
+ * @param lockPath the zookeeper node path for the lock
*
- * @return <tt>true</tt> if the segment has been locked, otherwise
+ * @return <tt>true</tt> if the path has been locked, otherwise
* <tt>false</tt>
*
* @since 2.0
*/
@Override
- public boolean isHasLocked(String segmentId) {
- String lockPath = getLockPath(segmentId);
+ public boolean isHasLocked(String lockPath) {
try {
return zkClient.checkExists().forPath(lockPath) != null;
} catch (Exception e) {
@@ -175,71 +173,66 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
}
/**
- * release the segment lock with the segmentId.
+ * release the lock with the specific path.
*
- * <p> the segment related zookeeper node will be deleted.
+ * <p> the path related zookeeper node will be deleted.
*
- * @param segmentId the id of segment need to release the lock
+ * @param lockPath the zookeeper node path for the lock.
*
* @since 2.0
*/
@Override
- public void unlockWithName(String segmentId) {
- String lockPath = getLockPath(segmentId);
+ public void unlock(String lockPath) {
try {
- if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
- if (zkClient.checkExists().forPath(lockPath) != null) {
- zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath);
- logger.info("the lock for " + segmentId + " release successfully");
- } else {
- logger.info("the lock for " + segmentId + " has released");
- }
+ if (zkClient.checkExists().forPath(lockPath) != null) {
+ zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath);
+ logger.info("the lock for " + lockPath + " release successfully");
+ } else {
+ logger.info("the lock for " + lockPath + " has released");
}
} catch (Exception e) {
- logger.error("error release lock :" + segmentId);
+ logger.error("error release lock :" + lockPath);
throw new RuntimeException(e);
}
}
/**
- * watching all the locked segments related zookeeper nodes change,
- * in order to when one job server is down, other job server can take over the running jobs.
+ * watch the path so that when zookeeper node change, the client could receive the notification.
+ * Note: the client should close the PathChildrenCache in time.
+ *
+ * @param watchPath the path need to watch
*
- * @param pool the threadPool watching the zookeeper node change
+ * @param watchExecutor the executor watching the zookeeper node change
*
- * @param doWatch do the concrete action with the zookeeper node path and zookeeper node data
+ * @param watcherProcess do the concrete action with the node path and node data when zookeeper node changed
+ *
+ * @return PathChildrenCache the client should close the PathChildrenCache in time
*
* @since 2.0
*/
@Override
- public void watchLock(ExecutorService pool, final DoWatchLock doWatch) {
+ public PathChildrenCache watch(String watchPath, Executor watchExecutor, final WatcherProcess watcherProcess) {
+ PathChildrenCache cache = new PathChildrenCache(zkClient, watchPath, true);
try {
- childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
- childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
+ cache.start();
+ cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_REMOVED:
- doWatch.doWatch(event.getData().getPath(), new String(event.getData().getData(), Charset.forName("UTF-8")));
+ watcherProcess.process(event.getData().getPath(), new String(event.getData().getData(), Charset.forName("UTF-8")));
break;
default:
break;
}
}
- }, pool);
+ }, watchExecutor);
} catch (Exception e) {
logger.warn("watch the zookeeper node fail: " + e);
}
- }
-
- private String getLockPath(String pathName) {
- return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix() + "/" + pathName;
- }
-
- private String getWatchPath() {
- return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix();
+ return cache;
}
@Override
@@ -253,7 +246,6 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
public void close() {
try {
- childrenCache.close();
zkClient.close();
} catch (Exception e) {
logger.error("error occurred to close PathChildrenCache", e);
http://git-wip-us.apache.org/repos/asf/kylin/blob/1a8e2573/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
index 7bf7498..7315d1d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
@@ -35,7 +35,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.common.lock.JobLock;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
[6/7] kylin git commit: Fix NPE in DistributedScheduler
Posted by li...@apache.org.
Fix NPE in DistributedScheduler
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/796d80ff
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/796d80ff
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/796d80ff
Branch: refs/heads/master-KYLIN-2506
Commit: 796d80ff2ca4219709fe594c8581c3218ae4f849
Parents: 875a52e
Author: kangkaisen <ka...@163.com>
Authored: Fri Apr 14 20:10:27 2017 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun Apr 16 17:12:50 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/job/impl/threadpool/DistributedScheduler.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/796d80ff/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index b99da7c..c5b03dc 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -294,11 +294,11 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>());
context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig());
- resumeAllRunningJobs();
-
fetcher = new FetcherRunner();
fetcherPool.scheduleAtFixedRate(fetcher, 10, ExecutableConstants.DEFAULT_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS);
hasStarted = true;
+
+ resumeAllRunningJobs();
}
private void resumeAllRunningJobs() {
[4/7] kylin git commit: KYLIN-2506 Add distributed lock for
GlobalDictionaryBuilder
Posted by li...@apache.org.
KYLIN-2506 Add distributed lock for GlobalDictionaryBuilder
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/875a52e1
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/875a52e1
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/875a52e1
Branch: refs/heads/master-KYLIN-2506
Commit: 875a52e1a63ed3e79155d17cb93e35ff5b1841dd
Parents: 1a8e257
Author: kangkaisen <ka...@163.com>
Authored: Mon Apr 10 16:47:46 2017 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun Apr 16 17:12:50 2017 +0800
----------------------------------------------------------------------
.../kylin/dict/AppendTrieDictionaryBuilder.java | 12 +-
.../apache/kylin/dict/GlobalDictHDFSStore.java | 18 +--
.../org/apache/kylin/dict/GlobalDictStore.java | 6 +-
.../kylin/dict/GlobalDictionaryBuilder.java | 103 ++++++++++++++-
.../kylin/dict/AppendTrieDictionaryTest.java | 76 +-----------
.../dict/ITGlobalDictionaryBuilderTest.java | 124 +++++++++++++++++++
6 files changed, 235 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/875a52e1/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
index c35a815..efa681b 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
@@ -20,8 +20,6 @@ package org.apache.kylin.dict;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.BytesUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
@@ -30,7 +28,6 @@ import java.util.TreeMap;
import static com.google.common.base.Preconditions.checkState;
public class AppendTrieDictionaryBuilder {
- private static final Logger logger = LoggerFactory.getLogger(AppendTrieDictionaryBuilder.class);
private final String baseDir;
private final String workingDir;
@@ -42,7 +39,6 @@ public class AppendTrieDictionaryBuilder {
private int nValues;
private BytesConverter bytesConverter;
private TreeMap<DictSliceKey, String> sliceFileMap = new TreeMap<>(); // slice key -> slice file name
- private int counter;
private DictSliceKey curKey;
private DictNode curNode;
@@ -77,11 +73,7 @@ public class AppendTrieDictionaryBuilder {
}
@SuppressWarnings("unchecked")
- public void addValue(String value) {
- if (counter++ > 0 && counter % 1_000_000 == 0) {
- logger.info("processed {} values", counter);
- }
-
+ public void addValue(String value) throws IOException {
byte[] valueBytes = bytesConverter.convertToBytes(value);
if (sliceFileMap.isEmpty()) {
@@ -134,7 +126,7 @@ public class AppendTrieDictionaryBuilder {
return dict;
}
- private void flushCurrentNode() {
+ private void flushCurrentNode() throws IOException {
String newSliceFile = store.writeSlice(workingDir, curKey, curNode);
String oldSliceFile = sliceFileMap.put(curKey, newSliceFile);
if (oldSliceFile != null) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/875a52e1/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java
index d9030d3..7cf5591 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java
@@ -175,18 +175,16 @@ public class GlobalDictHDFSStore extends GlobalDictStore {
}
@Override
- DictSlice readSlice(String directory, String sliceFileName) {
+ DictSlice readSlice(String directory, String sliceFileName) throws IOException {
Path path = new Path(directory, sliceFileName);
logger.info("read slice from {}", path);
try (FSDataInputStream input = fileSystem.open(path, BUFFER_SIZE)) {
return DictSlice.deserializeFrom(input);
- } catch (IOException e) {
- throw new RuntimeException(String.format("read slice %s failed", path), e);
}
}
@Override
- String writeSlice(String workingDir, DictSliceKey key, DictNode slice) {
+ String writeSlice(String workingDir, DictSliceKey key, DictNode slice) throws IOException {
//write new slice
String sliceFile = IndexFormatV2.sliceFileName(key);
Path path = new Path(workingDir, sliceFile);
@@ -195,22 +193,16 @@ public class GlobalDictHDFSStore extends GlobalDictStore {
try (FSDataOutputStream out = fileSystem.create(path, true, BUFFER_SIZE)) {
byte[] bytes = slice.buildTrieBytes();
out.write(bytes);
- } catch (IOException e) {
- throw new RuntimeException(String.format("write slice with key %s into file %s failed", key, path), e);
}
return sliceFile;
}
@Override
- void deleteSlice(String workingDir, String sliceFileName) {
+ void deleteSlice(String workingDir, String sliceFileName) throws IOException {
Path path = new Path(workingDir, sliceFileName);
logger.info("delete slice at {}", path);
- try {
- if (fileSystem.exists(path)) {
- fileSystem.delete(path, false);
- }
- } catch (IOException e) {
- throw new RuntimeException(String.format("delete slice at %s failed", path), e);
+ if (fileSystem.exists(path)) {
+ fileSystem.delete(path, false);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/875a52e1/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java
index 5817868..6a7a20c 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java
@@ -63,7 +63,7 @@ public abstract class GlobalDictStore {
* @return a <i>DictSlice</i>
* @throws IOException on I/O error
*/
- abstract DictSlice readSlice(String workingDir, String sliceFileName);
+ abstract DictSlice readSlice(String workingDir, String sliceFileName) throws IOException;
/**
* Write a slice with the given key to the specified directory.
@@ -73,7 +73,7 @@ public abstract class GlobalDictStore {
* @return file name of the new written slice
* @throws IOException on I/O error
*/
- abstract String writeSlice(String workingDir, DictSliceKey key, DictNode slice);
+ abstract String writeSlice(String workingDir, DictSliceKey key, DictNode slice) throws IOException;
/**
* Delete a slice with the specified file name.
@@ -81,7 +81,7 @@ public abstract class GlobalDictStore {
* @param sliceFileName file name of the slice, should exist
* @throws IOException on I/O error
*/
- abstract void deleteSlice(String workingDir, String sliceFileName);
+ abstract void deleteSlice(String workingDir, String sliceFileName) throws IOException;
/**
* commit the <i>DictSlice</i> and <i>GlobalDictMetadata</i> in workingDir to new versionDir
http://git-wip-us.apache.org/repos/asf/kylin/blob/875a52e1/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
index 7921980..9d66b12 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
@@ -19,9 +19,17 @@
package org.apache.kylin.dict;
import java.io.IOException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.lock.DistributedJobLock;
+import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.Dictionary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* GlobalDictinary based on whole cube, to ensure one value has same dict id in different segments.
@@ -29,8 +37,16 @@ import org.apache.kylin.common.util.Dictionary;
* Created by sunyerui on 16/5/24.
*/
public class GlobalDictionaryBuilder implements IDictionaryBuilder {
- AppendTrieDictionaryBuilder builder;
- int baseId;
+ private AppendTrieDictionaryBuilder builder;
+ private int baseId;
+
+ private DistributedJobLock lock;
+ private String sourceColumn;
+ //the job thread name is UUID+threadID
+ private final String jobUUID = Thread.currentThread().getName();
+ private int counter;
+
+ private static Logger logger = LoggerFactory.getLogger(GlobalDictionaryBuilder.class);
@Override
public void init(DictionaryInfo dictInfo, int baseId) throws IOException {
@@ -38,6 +54,9 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
throw new IllegalArgumentException("GlobalDictinaryBuilder must used with an existing DictionaryInfo");
}
+ sourceColumn = dictInfo.getSourceTable() + "_" + dictInfo.getSourceColumn();
+ lock(sourceColumn);
+
int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
this.builder = new AppendTrieDictionaryBuilder(dictInfo.getResourceDir(), maxEntriesPerSlice);
this.baseId = baseId;
@@ -45,14 +64,88 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
@Override
public boolean addValue(String value) {
- if (value == null)
+ if (++counter % 1_000_000 == 0) {
+ if (lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+ logger.info("processed {} values", counter);
+ } else {
+ throw new RuntimeException("Failed to create global dictionary on " + sourceColumn + " This client doesn't keep the lock");
+ }
+ }
+
+ if (value == null) {
return false;
- builder.addValue(value);
+ }
+
+ try {
+ builder.addValue(value);
+ } catch (Throwable e) {
+ checkAndUnlock();
+ throw new RuntimeException(String.format("Failed to create global dictionary on %s ", sourceColumn), e);
+ }
+
return true;
}
@Override
public Dictionary<String> build() throws IOException {
- return builder.build(baseId);
+ try {
+ if (lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+ return builder.build(baseId);
+ }
+ } finally {
+ checkAndUnlock();
+ }
+ return new AppendTrieDictionary<>();
+ }
+
+ private void lock(final String sourceColumn) throws IOException {
+ lock = (DistributedJobLock) ClassUtil.newInstance("org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock");
+
+ if (!lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+ logger.info("{} will wait the lock for {} ", jobUUID, sourceColumn);
+
+ final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(1);
+
+ PathChildrenCache cache = lock.watch(getWatchPath(sourceColumn), MoreExecutors.sameThreadExecutor(), new DistributedJobLock.WatcherProcess() {
+ @Override
+ public void process(String path, String data) {
+ if (!data.equalsIgnoreCase(jobUUID) && lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+ try {
+ bq.put("getLock");
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ });
+
+ long start = System.currentTimeMillis();
+
+ try {
+ bq.take();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ cache.close();
+ }
+
+ logger.info("{} has waited the lock {} ms for {} ", jobUUID, (System.currentTimeMillis() - start), sourceColumn);
+ }
+ }
+
+ private void checkAndUnlock() {
+ if (lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+ lock.unlock(getLockPath(sourceColumn));
+ }
+ }
+
+ private static final String GLOBAL_DICT_LOCK_PATH = "/kylin/dict/lock";
+
+ private String getLockPath(String pathName) {
+ return GLOBAL_DICT_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName + "/lock";
+ }
+
+ private String getWatchPath(String pathName) {
+ return GLOBAL_DICT_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/875a52e1/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
index 9da5071..e863901 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
@@ -22,7 +22,6 @@ import static org.apache.kylin.dict.GlobalDictHDFSStore.V2_INDEX_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.BufferedReader;
@@ -44,8 +43,6 @@ import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -58,18 +55,14 @@ import org.junit.Ignore;
import org.junit.Test;
public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
-
- private static final UUID uuid = UUID.randomUUID();
- private static final String RESOURCE_DIR = "/dict/append_dict_test/" + uuid;
- private static final String HDFS_DIR = "file:///tmp/kylin_append_dict";
+ private static final String RESOURCE_DIR = "/dict/append_dict_test/" + UUID.randomUUID();
private static String BASE_DIR;
- private static String LOCAL_BASE_DIR = "/tmp/kylin_append_dict/kylin_metadata/resources/GlobalDict" + RESOURCE_DIR + "/";
+ private static String LOCAL_BASE_DIR = "/tmp/kylin/kylin_metadata/resources/GlobalDict" + RESOURCE_DIR + "/";
@Before
public void beforeTest() {
staticCreateTestMetadata();
KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "50000");
- KylinConfig.getInstanceFromEnv().setProperty("kylin.env.hdfs-working-dir", HDFS_DIR);
BASE_DIR = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict" + RESOURCE_DIR + "/";
}
@@ -80,7 +73,7 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
}
private void cleanup() {
- Path basePath = new Path(HDFS_DIR);
+ Path basePath = new Path(BASE_DIR);
try {
HadoopUtil.getFileSystem(basePath).delete(basePath, true);
} catch (IOException e) {
@@ -318,69 +311,6 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
dictionary.getMaxId();
}
- private class SharedBuilderThread extends Thread {
- CountDownLatch startLatch;
- CountDownLatch finishLatch;
- String prefix;
- int count;
-
- SharedBuilderThread(CountDownLatch startLatch, CountDownLatch finishLatch, String prefix, int count) {
- this.startLatch = startLatch;
- this.finishLatch = finishLatch;
- this.prefix = prefix;
- this.count = count;
- }
-
- @Override
- public void run() {
- try {
- AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
- startLatch.countDown();
- for (int i = 0; i < count; i++) {
- builder.addValue(prefix + i);
- }
- builder.build(0);
- finishLatch.countDown();
- } catch (IOException e) {
- }
- }
- }
-
- @Ignore
- @Test
- public void testSharedBuilder() throws IOException, InterruptedException {
- final CountDownLatch startLatch = new CountDownLatch(3);
- final CountDownLatch finishLatch = new CountDownLatch(3);
-
- AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
- Thread t1 = new SharedBuilderThread(startLatch, finishLatch, "t1_", 10000);
- Thread t2 = new SharedBuilderThread(startLatch, finishLatch, "t2_", 10);
- Thread t3 = new SharedBuilderThread(startLatch, finishLatch, "t3_", 100000);
- t1.start();
- t2.start();
- t3.start();
- startLatch.await();
- AppendTrieDictionary dict = builder.build(0);
- assertTrue("AppendDictBuilder Thread too slow", finishLatch.await(3000, TimeUnit.MILLISECONDS));
- assertEquals(110010, dict.getMaxId());
-
- builder = createBuilder(RESOURCE_DIR);
- builder.addValue("success");
- builder.addValue("s");
- dict = builder.build(0);
- for (int i = 0; i < 10000; i++) {
- assertNotEquals(-1, dict.getIdFromValue("t1_" + i));
- }
- for (int i = 0; i < 10; i++) {
- assertNotEquals(-1, dict.getIdFromValue("t2_" + i));
- }
- for (int i = 0; i < 100000; i++) {
- assertNotEquals(-1, dict.getIdFromValue("t3_" + i));
- }
- assertEquals(110011, dict.getIdFromValue("success"));
- assertEquals(110012, dict.getIdFromValue("s"));
- }
-
@Test
public void testSplitContainSuperLongValue() throws IOException {
String superLongValue = "%5Cx1A%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%7E%29%5CxEF%5CxBF%5CxBD%5Cx1B+%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5Cx13%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5B";
http://git-wip-us.apache.org/repos/asf/kylin/blob/875a52e1/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java
new file mode 100644
index 0000000..4afaccd
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Created by kangkaisen on 2017/4/10.
+ */
+public class ITGlobalDictionaryBuilderTest extends HBaseMetadataTestCase {
+ private DictionaryInfo dictionaryInfo;
+
+ @Before
+ public void beforeTest() throws Exception {
+ staticCreateTestMetadata();
+ dictionaryInfo = new DictionaryInfo("testTable", "testColumn", 0, "String", null);
+ }
+
+ @After
+ public void afterTest() {
+ cleanup();
+ staticCleanupTestMetadata();
+ }
+
+ private void cleanup() {
+ String BASE_DIR = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict" + dictionaryInfo.getResourceDir() + "/";
+ Path basePath = new Path(BASE_DIR);
+ try {
+ HadoopUtil.getFileSystem(basePath).delete(basePath, true);
+ } catch (IOException e) {
+ }
+ }
+
+ @Test
+ public void testGlobalDictLock() throws IOException, InterruptedException {
+ final CountDownLatch startLatch = new CountDownLatch(3);
+ final CountDownLatch finishLatch = new CountDownLatch(3);
+
+ Thread t1 = new SharedBuilderThread(startLatch, finishLatch, "t1_", 10000);
+ Thread t2 = new SharedBuilderThread(startLatch, finishLatch, "t2_", 10);
+ Thread t3 = new SharedBuilderThread(startLatch, finishLatch, "t3_", 100000);
+ t1.start();
+ t2.start();
+ t3.start();
+ startLatch.await();
+ finishLatch.await();
+
+ GlobalDictionaryBuilder builder = new GlobalDictionaryBuilder();
+ builder.init(dictionaryInfo, 0);
+ builder.addValue("success");
+ Dictionary<String> dict = builder.build();
+
+ for (int i = 0; i < 10000; i++) {
+ assertNotEquals(-1, dict.getIdFromValue("t1_" + i));
+ }
+ for (int i = 0; i < 10; i++) {
+ assertNotEquals(-1, dict.getIdFromValue("t2_" + i));
+ }
+ for (int i = 0; i < 100000; i++) {
+ assertNotEquals(-1, dict.getIdFromValue("t3_" + i));
+ }
+
+ assertEquals(110011, dict.getIdFromValue("success"));
+ }
+
+ private class SharedBuilderThread extends Thread {
+ CountDownLatch startLatch;
+ CountDownLatch finishLatch;
+ String prefix;
+ int count;
+
+ SharedBuilderThread(CountDownLatch startLatch, CountDownLatch finishLatch, String prefix, int count) {
+ this.startLatch = startLatch;
+ this.finishLatch = finishLatch;
+ this.prefix = prefix;
+ this.count = count;
+ }
+
+ @Override
+ public void run() {
+ try {
+ GlobalDictionaryBuilder builder = new GlobalDictionaryBuilder();
+ startLatch.countDown();
+
+ builder.init(dictionaryInfo, 0);
+ for (int i = 0; i < count; i++) {
+ builder.addValue(prefix + i);
+ }
+ builder.build();
+ finishLatch.countDown();
+ } catch (IOException e) {
+ }
+ }
+ }
+}
[7/7] kylin git commit: KYLIN-2506 code review,
split the role of DistributedLock and JobLock
Posted by li...@apache.org.
KYLIN-2506 code review, split the role of DistributedLock and JobLock
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/06bf2a16
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/06bf2a16
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/06bf2a16
Branch: refs/heads/master-KYLIN-2506
Commit: 06bf2a16234b323ddf66c65b4056a5412db46143
Parents: 796d80f
Author: Yang Li <li...@apache.org>
Authored: Sun Apr 16 19:42:54 2017 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun Apr 16 19:42:54 2017 +0800
----------------------------------------------------------------------
core-common/pom.xml | 5 ---
.../apache/kylin/common/KylinConfigBase.java | 7 ++++
.../kylin/common/lock/DistributedJobLock.java | 38 --------------------
.../kylin/common/lock/DistributedLock.java | 37 +++++++++++++++++++
.../org/apache/kylin/common/lock/JobLock.java | 26 --------------
.../apache/kylin/common/lock/MockJobLock.java | 33 -----------------
.../kylin/dict/GlobalDictionaryBuilder.java | 28 +++++++--------
.../java/org/apache/kylin/job/Scheduler.java | 2 +-
.../job/impl/threadpool/DefaultScheduler.java | 10 +++---
.../impl/threadpool/DistributedScheduler.java | 27 +++++++-------
.../kylin/job/lock/DistributedJobLock.java | 24 +++++++++++++
.../java/org/apache/kylin/job/lock/JobLock.java | 30 ++++++++++++++++
.../org/apache/kylin/job/lock/MockJobLock.java | 33 +++++++++++++++++
.../job/impl/threadpool/BaseSchedulerTest.java | 2 +-
.../test_case_data/localmeta/kylin.properties | 2 +-
.../test_case_data/sandbox/kylin.properties | 2 +-
.../kylin/job/BaseTestDistributedScheduler.java | 2 +-
.../apache/kylin/rest/service/JobService.java | 2 +-
.../hbase/util/ZookeeperDistributedJobLock.java | 17 +++++----
.../storage/hbase/util/ZookeeperJobLock.java | 6 ++--
20 files changed, 183 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-common/pom.xml
----------------------------------------------------------------------
diff --git a/core-common/pom.xml b/core-common/pom.xml
index 5b5f78b..95d3c29 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -69,11 +69,6 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 4361242..bf6cdb8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -29,6 +29,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.lock.DistributedLock;
+import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -235,6 +237,11 @@ abstract public class KylinConfigBase implements Serializable {
public Map<String, String> getCubeCustomMeasureTypes() {
return getPropertiesByPrefix("kylin.metadata.custom-measure-types.");
}
+
+ public DistributedLock getDistributedLock() {
+ String clsName = getOptional("kylin.metadata.distributed-lock-impl", "org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock");
+ return (DistributedLock) ClassUtil.newInstance(clsName);
+ }
// ============================================================================
// DICTIONARY & SNAPSHOT
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java
deleted file mode 100644
index 00d1ca4..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.common.lock;
-
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-
-import java.util.concurrent.Executor;
-
-public interface DistributedJobLock extends JobLock {
-
- boolean lockWithClient(String lockPath, String lockClient);
-
- boolean isHasLocked(String lockPath);
-
- void unlock(String lockPath);
-
- PathChildrenCache watch(String watchPath, Executor watchExecutor, WatcherProcess process);
-
- public interface WatcherProcess {
- void process(String path, String data);
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java
new file mode 100644
index 0000000..ead7714
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.lock;
+
+import java.io.Closeable;
+import java.util.concurrent.Executor;
+
+public interface DistributedLock extends Closeable {
+
+ boolean lockPath(String lockPath, String lockClient);
+
+ boolean isPathLocked(String lockPath);
+
+ void unlockPath(String lockPath);
+
+ Closeable watchPath(String watchPath, Executor watchExecutor, Watcher process);
+
+ public interface Watcher {
+ void process(String path, String data);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
deleted file mode 100644
index 5802d71..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.common.lock;
-
-
-public interface JobLock {
- boolean lock();
-
- void unlock();
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
deleted file mode 100644
index f8233be..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.common.lock;
-
-/**
- */
-public class MockJobLock implements JobLock {
- @Override
- public boolean lock() {
- return true;
- }
-
- @Override
- public void unlock() {
- return;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
index 9d66b12..f2ed375 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
@@ -18,19 +18,19 @@
package org.apache.kylin.dict;
+import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.lock.DistributedJobLock;
-import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.lock.DistributedLock;
import org.apache.kylin.common.util.Dictionary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.MoreExecutors;
+
/**
* GlobalDictinary based on whole cube, to ensure one value has same dict id in different segments.
* GlobalDictinary mainly used for count distinct measure to support rollup among segments.
@@ -40,7 +40,7 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
private AppendTrieDictionaryBuilder builder;
private int baseId;
- private DistributedJobLock lock;
+ private DistributedLock lock;
private String sourceColumn;
//the job thread name is UUID+threadID
private final String jobUUID = Thread.currentThread().getName();
@@ -65,7 +65,7 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
@Override
public boolean addValue(String value) {
if (++counter % 1_000_000 == 0) {
- if (lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+ if (lock.lockPath(getLockPath(sourceColumn), jobUUID)) {
logger.info("processed {} values", counter);
} else {
throw new RuntimeException("Failed to create global dictionary on " + sourceColumn + " This client doesn't keep the lock");
@@ -89,7 +89,7 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
@Override
public Dictionary<String> build() throws IOException {
try {
- if (lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+ if (lock.lockPath(getLockPath(sourceColumn), jobUUID)) {
return builder.build(baseId);
}
} finally {
@@ -99,17 +99,17 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
}
private void lock(final String sourceColumn) throws IOException {
- lock = (DistributedJobLock) ClassUtil.newInstance("org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock");
+ lock = KylinConfig.getInstanceFromEnv().getDistributedLock();
- if (!lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+ if (!lock.lockPath(getLockPath(sourceColumn), jobUUID)) {
logger.info("{} will wait the lock for {} ", jobUUID, sourceColumn);
final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(1);
- PathChildrenCache cache = lock.watch(getWatchPath(sourceColumn), MoreExecutors.sameThreadExecutor(), new DistributedJobLock.WatcherProcess() {
+ Closeable watch = lock.watchPath(getWatchPath(sourceColumn), MoreExecutors.sameThreadExecutor(), new DistributedLock.Watcher() {
@Override
public void process(String path, String data) {
- if (!data.equalsIgnoreCase(jobUUID) && lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+ if (!data.equalsIgnoreCase(jobUUID) && lock.lockPath(getLockPath(sourceColumn), jobUUID)) {
try {
bq.put("getLock");
} catch (InterruptedException e) {
@@ -126,7 +126,7 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
- cache.close();
+ watch.close();
}
logger.info("{} has waited the lock {} ms for {} ", jobUUID, (System.currentTimeMillis() - start), sourceColumn);
@@ -134,8 +134,8 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
}
private void checkAndUnlock() {
- if (lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
- lock.unlock(getLockPath(sourceColumn));
+ if (lock.lockPath(getLockPath(sourceColumn), jobUUID)) {
+ lock.unlockPath(getLockPath(sourceColumn));
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
index e2cfd44..93d2510 100644
--- a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
@@ -21,7 +21,7 @@ package org.apache.kylin.job;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.exception.SchedulerException;
import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.common.lock.JobLock;
+import org.apache.kylin.job.lock.JobLock;
/**
*/
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 688708e..8b6b5aa 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -40,7 +40,7 @@ import org.apache.kylin.job.execution.Executable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.common.lock.JobLock;
+import org.apache.kylin.job.lock.JobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -187,8 +187,8 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
}
@Override
- public synchronized void init(JobEngineConfig jobEngineConfig, final JobLock jobLock) throws SchedulerException {
- this.jobLock = jobLock;
+ public synchronized void init(JobEngineConfig jobEngineConfig, JobLock lock) throws SchedulerException {
+ jobLock = lock;
String serverMode = jobEngineConfig.getConfig().getServerMode();
if (!("job".equals(serverMode.toLowerCase()) || "all".equals(serverMode.toLowerCase()))) {
@@ -205,7 +205,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
this.jobEngineConfig = jobEngineConfig;
- if (jobLock.lock() == false) {
+ if (jobLock.lockJobEngine() == false) {
throw new IllegalStateException("Cannot start job scheduler due to lack of job lock");
}
@@ -226,7 +226,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
@Override
public void shutdown() throws SchedulerException {
logger.info("Shutingdown Job Engine ....");
- jobLock.unlock();
+ jobLock.unlockJobEngine();
fetcherPool.shutdown();
jobPool.shutdown();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index c5b03dc..d544320 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -18,6 +18,7 @@
package org.apache.kylin.job.impl.threadpool;
+import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -34,10 +35,10 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.lock.DistributedLock;
import org.apache.kylin.common.util.SetThreadName;
import org.apache.kylin.job.Scheduler;
import org.apache.kylin.job.constant.ExecutableConstants;
@@ -50,8 +51,8 @@ import org.apache.kylin.job.execution.Executable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.common.lock.DistributedJobLock;
-import org.apache.kylin.common.lock.JobLock;
+import org.apache.kylin.job.lock.DistributedJobLock;
+import org.apache.kylin.job.lock.JobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,8 +73,8 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
private ExecutorService watchPool;
private ExecutorService jobPool;
private DefaultContext context;
- private DistributedJobLock jobLock;
- private PathChildrenCache lockWatch;
+ private DistributedLock jobLock;
+ private Closeable lockWatch;
private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class);
private static final ConcurrentMap<KylinConfig, DistributedScheduler> CACHE = new ConcurrentHashMap<KylinConfig, DistributedScheduler>();
@@ -181,7 +182,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
public void run() {
try (SetThreadName ignored = new SetThreadName("Job %s", executable.getId())) {
String segmentId = executable.getParam(SEGMENT_ID);
- if (jobLock.lockWithClient(getLockPath(segmentId), serverName)) {
+ if (jobLock.lockPath(getLockPath(segmentId), serverName)) {
logger.info(executable.toString() + " scheduled in server: " + serverName);
context.addRunningJob(executable);
@@ -209,7 +210,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
if (state != ExecutableState.READY && state != ExecutableState.RUNNING) {
if (segmentWithLocks.contains(segmentId)) {
logger.info(executable.toString() + " will release the lock for the segment: " + segmentId);
- jobLock.unlock(getLockPath(segmentId));
+ jobLock.unlockPath(getLockPath(segmentId));
segmentWithLocks.remove(segmentId);
}
}
@@ -218,7 +219,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
}
//when the segment lock released but the segment related job still running, resume the job.
- private class WatcherProcessImpl implements org.apache.kylin.common.lock.DistributedJobLock.WatcherProcess {
+ private class WatcherProcessImpl implements org.apache.kylin.common.lock.DistributedLock.Watcher {
private String serverName;
public WatcherProcessImpl(String serverName) {
@@ -237,7 +238,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
if (executable instanceof DefaultChainedExecutable && executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId) && !nodeData.equalsIgnoreCase(serverName)) {
try {
logger.warn(nodeData + " has released the lock for: " + segmentId + " but the job still running. so " + serverName + " resume the job");
- if (!jobLock.isHasLocked(getLockPath(segmentId))) {
+ if (!jobLock.isPathLocked(getLockPath(segmentId))) {
executableManager.resumeRunningJobForce(executable.getId());
fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
break;
@@ -264,7 +265,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
}
@Override
- public synchronized void init(JobEngineConfig jobEngineConfig, final JobLock jobLock) throws SchedulerException {
+ public synchronized void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException {
String serverMode = jobEngineConfig.getConfig().getServerMode();
if (!("job".equals(serverMode.toLowerCase()) || "all".equals(serverMode.toLowerCase()))) {
logger.info("server mode: " + serverMode + ", no need to run job scheduler");
@@ -288,7 +289,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
//watch the zookeeper node change, so that when one job server is down, other job servers can take over.
watchPool = Executors.newFixedThreadPool(1);
WatcherProcessImpl watcherProcess = new WatcherProcessImpl(this.serverName);
- lockWatch = this.jobLock.watch(getWatchPath(), watchPool, watcherProcess);
+ lockWatch = this.jobLock.watchPath(getWatchPath(), watchPool, watcherProcess);
int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>());
@@ -307,7 +308,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
AbstractExecutable executable = executableManager.getJob(id);
if (output.getState() == ExecutableState.RUNNING && executable instanceof DefaultChainedExecutable) {
try {
- if (!jobLock.isHasLocked(executable.getParam(SEGMENT_ID))) {
+ if (!jobLock.isPathLocked(executable.getParam(SEGMENT_ID))) {
executableManager.resumeRunningJobForce(executable.getId());
fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
}
@@ -348,7 +349,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
private void releaseAllLocks() {
for (String segmentId : segmentWithLocks) {
- jobLock.unlock(getLockPath(segmentId));
+ jobLock.unlockPath(getLockPath(segmentId));
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
new file mode 100644
index 0000000..e5e2a1e
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.lock;
+
+import org.apache.kylin.common.lock.DistributedLock;
+
+public interface DistributedJobLock extends JobLock, DistributedLock {
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
new file mode 100644
index 0000000..1b6b29e
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.lock;
+
+/**
+ * Among a Kylin cluster, usually only one node runs as the job engine and does the scheduling of build jobs.
+ * This interface is for such negotiation.
+ */
+public interface JobLock {
+
+ boolean lockJobEngine();
+
+ void unlockJobEngine();
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
new file mode 100644
index 0000000..73f6192
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.lock;
+
+/**
+ */
+public class MockJobLock implements JobLock {
+ @Override
+ public boolean lockJobEngine() {
+ return true;
+ }
+
+ @Override
+ public void unlockJobEngine() {
+ return;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index 1bafa34..1ada9a1 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -28,7 +28,7 @@ import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.common.lock.MockJobLock;
+import org.apache.kylin.job.lock.MockJobLock;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/examples/test_case_data/localmeta/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties
index 3866575..969e466 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -133,7 +133,7 @@ kylin.security.saml.context-path=/kylin
kylin.test.bcc.new.key=some-value
kylin.engine.mr.config-override.test1=test1
kylin.engine.mr.config-override.test2=test2
-kylin.job.lock=org.apache.kylin.common.lock.MockJobLock
+kylin.job.lock=org.apache.kylin.job.lock.MockJobLock
kylin.engine.provider.0=org.apache.kylin.engine.mr.MRBatchCubingEngine
kylin.engine.provider.2=org.apache.kylin.engine.mr.MRBatchCubingEngine2
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index c0a4968..684b4dd 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -112,7 +112,7 @@ kylin.query.udf.concat=org.apache.kylin.query.udf.ConcatUDF
kylin.query.udf.version=org.apache.kylin.query.udf.VersionUDF
# for test
-kylin.job.lock=org.apache.kylin.common.lock.MockJobLock
+kylin.job.lock=org.apache.kylin.job.lock.MockJobLock
kylin.engine.mr.uhc-reducer-count=3
### CUBE ###
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
index 4877ca1..b4ac42f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
@@ -167,7 +167,7 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
}
boolean lock(ZookeeperDistributedJobLock jobLock, String cubeName, String serverName) {
- return jobLock.lockWithClient(getLockPath(cubeName), serverName);
+ return jobLock.lockPath(getLockPath(cubeName), serverName);
}
private static void initZk() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 31d1ded..4ba426e 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -51,7 +51,7 @@ import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.common.lock.JobLock;
+import org.apache.kylin.job.lock.JobLock;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.rest.constant.Constant;
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
index 5f5a721..eb01e4b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
@@ -32,7 +32,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.lock.DistributedJobLock;
+import org.apache.kylin.job.lock.DistributedJobLock;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +40,9 @@ import org.slf4j.LoggerFactory;
public class ZookeeperDistributedJobLock implements DistributedJobLock {
private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class);
+ @SuppressWarnings("unused")
private final KylinConfig config;
+
private static final ConcurrentMap<KylinConfig, CuratorFramework> CACHE = new ConcurrentHashMap<KylinConfig, CuratorFramework>();
private final CuratorFramework zkClient;
@@ -100,7 +102,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
*/
@Override
- public boolean lockWithClient(String lockPath, String lockClient) {
+ public boolean lockPath(String lockPath, String lockClient) {
logger.info(lockClient + " start lock the path: " + lockPath);
boolean hasLock = false;
@@ -163,7 +165,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
*/
@Override
- public boolean isHasLocked(String lockPath) {
+ public boolean isPathLocked(String lockPath) {
try {
return zkClient.checkExists().forPath(lockPath) != null;
} catch (Exception e) {
@@ -183,7 +185,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
*/
@Override
- public void unlock(String lockPath) {
+ public void unlockPath(String lockPath) {
try {
if (zkClient.checkExists().forPath(lockPath) != null) {
zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath);
@@ -213,7 +215,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
*/
@Override
- public PathChildrenCache watch(String watchPath, Executor watchExecutor, final WatcherProcess watcherProcess) {
+ public PathChildrenCache watchPath(String watchPath, Executor watchExecutor, final Watcher watcherProcess) {
PathChildrenCache cache = new PathChildrenCache(zkClient, watchPath, true);
try {
cache.start();
@@ -236,14 +238,15 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
}
@Override
- public boolean lock() {
+ public boolean lockJobEngine() {
return true;
}
@Override
- public void unlock() {
+ public void unlockJobEngine() {
}
+ @Override
public void close() {
try {
zkClient.close();
http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
index 7315d1d..6a3cf7e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
@@ -35,7 +35,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.lock.JobLock;
+import org.apache.kylin.job.lock.JobLock;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -56,7 +56,7 @@ public class ZookeeperJobLock implements JobLock {
private CuratorFramework zkClient;
@Override
- public boolean lock() {
+ public boolean lockJobEngine() {
this.scheduleID = schedulerId();
String zkConnectString = getZKConnectString();
logger.info("zk connection string:" + zkConnectString);
@@ -100,7 +100,7 @@ public class ZookeeperJobLock implements JobLock {
}
@Override
- public void unlock() {
+ public void unlockJobEngine() {
releaseLock();
}