You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/06/05 05:23:01 UTC
[13/67] [abbrv] kylin git commit: Revert "reformat code"
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java
index 869146b..f018f28 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java
@@ -16,14 +16,13 @@
* limitations under the License.
*/
-package org.apache.kylin.engine.mr.steps;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
+package org.apache.kylin.engine.mr.steps;
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
@@ -33,10 +32,11 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
-import com.google.common.collect.Lists;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
@Ignore
public class NewCubeSamplingMethodTest {
@@ -64,6 +64,7 @@ public class NewCubeSamplingMethodTest {
compareAccuracyBasic(dataSet);
}
+
@Ignore
@Test
public void testSmallCardData() throws Exception {
@@ -72,6 +73,7 @@ public class NewCubeSamplingMethodTest {
compareAccuracyBasic(dataSet);
}
+
public void comparePerformanceBasic(final List<List<String>> rows) throws Exception {
//old hash method
ByteArray[] colHashValues = getNewColHashValues(ROW_LENGTH);
@@ -120,11 +122,11 @@ public class NewCubeSamplingMethodTest {
counter.add(hc.hash().asBytes());
}
long estimate = counter.getCountEstimate();
- System.out.println("old method finished. Estimate cardinality : " + estimate + ". Error rate : "
- + countErrorRate(estimate, realCardinality));
+ System.out.println("old method finished. Estimate cardinality : " + estimate + ". Error rate : " + countErrorRate(estimate, realCardinality));
}
});
+
long t2 = runAndGetTime(new TestCase() {
@Override
public void run() throws Exception {
@@ -147,8 +149,7 @@ public class NewCubeSamplingMethodTest {
counter.addHashDirectly(value);
}
long estimate = counter.getCountEstimate();
- System.out.println("new method finished. Estimate cardinality : " + estimate + ". Error rate : "
- + countErrorRate(estimate, realCardinality));
+ System.out.println("new method finished. Estimate cardinality : " + estimate + ". Error rate : " + countErrorRate(estimate, realCardinality));
}
});
}
@@ -178,6 +179,7 @@ public class NewCubeSamplingMethodTest {
return counters;
}
+
private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet) {
Integer[] indice = new Integer[Long.bitCount(cuboidId)];
@@ -206,8 +208,7 @@ public class NewCubeSamplingMethodTest {
void run() throws Exception;
}
- private void putRowKeyToHLL(List<String> row, ByteArray[] colHashValues, HLLCounter[] cuboidCounters,
- HashFunction hashFunction) {
+ private void putRowKeyToHLL(List<String> row, ByteArray[] colHashValues, HLLCounter[] cuboidCounters, HashFunction hashFunction) {
int x = 0;
for (String field : row) {
Hasher hc = hashFunction.newHasher();
@@ -224,8 +225,7 @@ public class NewCubeSamplingMethodTest {
}
}
- private void putRowKeyToHLLNew(List<String> row, long[] hashValuesLong, HLLCounter[] cuboidCounters,
- HashFunction hashFunction) {
+ private void putRowKeyToHLLNew(List<String> row, long[] hashValuesLong, HLLCounter[] cuboidCounters, HashFunction hashFunction) {
int x = 0;
for (String field : row) {
Hasher hc = hashFunction.newHasher();
@@ -266,7 +266,7 @@ public class NewCubeSamplingMethodTest {
return row;
}
- private String[] smallCardRow = { "abc", "bcd", "jifea", "feaifj" };
+ private String[] smallCardRow = {"abc", "bcd", "jifea", "feaifj"};
private Random rand = new Random(System.currentTimeMillis());
@@ -279,6 +279,7 @@ public class NewCubeSamplingMethodTest {
return row;
}
+
private int countCardinality(List<List<String>> rows) {
Set<String> diffCols = new HashSet<String>();
for (List<String> row : rows) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java
index ab55bcf..414ab95 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java
@@ -67,7 +67,7 @@ public class NumberDictionaryForestTest {
//stimulate map-reduce job
ArrayList<SelfDefineSortableKey> keyList = createKeyList(humanList, (byte) flag.ordinal());
Collections.sort(keyList);
-
+
//build tree
NumberDictionaryForestBuilder b = new NumberDictionaryForestBuilder(0, 0);
expectedList = numberSort(expectedList);
@@ -76,7 +76,7 @@ public class NumberDictionaryForestTest {
}
TrieDictionaryForest<String> dict = b.build();
dict.dump(System.out);
-
+
ArrayList<Integer> resultIds = new ArrayList<>();
for (int i = 0; i < keyList.size(); i++) {
SelfDefineSortableKey key = keyList.get(i);
@@ -84,7 +84,7 @@ public class NumberDictionaryForestTest {
resultIds.add(dict.getIdFromValue(fieldValue));
assertEquals(expectedList.get(i), dict.getValueFromId(dict.getIdFromValue(fieldValue)));
}
-
+
assertTrue(isIncreasedOrder(resultIds, new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
@@ -101,8 +101,7 @@ public class NumberDictionaryForestTest {
double d1 = Double.parseDouble(o1);
double d2 = Double.parseDouble(o2);
return Double.compare(d1, d2);
- }
- });
+ }});
return result;
}
@@ -291,18 +290,16 @@ public class NumberDictionaryForestTest {
int flag;
T previous = null;
for (T t : list) {
- if (previous == null)
- previous = t;
+ if (previous == null) previous = t;
else {
flag = comp.compare(previous, t);
- if (flag > 0)
- return false;
+ if (flag > 0) return false;
previous = t;
}
}
return true;
}
-
+
@Test
public void testNormalizeNumber() {
assertEquals("0", Number2BytesConverter.normalizeNumber("+0000.000"));
@@ -314,7 +311,7 @@ public class NumberDictionaryForestTest {
assertEquals("200", Number2BytesConverter.normalizeNumber("200"));
assertEquals("200", Number2BytesConverter.normalizeNumber("200.00"));
assertEquals("200.01", Number2BytesConverter.normalizeNumber("200.010"));
-
+
for (int i = -100; i < 101; i++) {
String expected = "" + i;
int cut = expected.startsWith("-") ? 1 : 0;
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java
index 231387b..551998f 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java
@@ -56,8 +56,7 @@ public class SelfDefineSortableKeyTest {
System.out.println("test numbers:" + longList);
ArrayList<String> strNumList = listToStringList(longList);
//System.out.println("test num strs list:"+strNumList);
- ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList,
- (byte) SelfDefineSortableKey.TypeFlag.INTEGER_FAMILY_TYPE.ordinal());
+ ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, (byte) SelfDefineSortableKey.TypeFlag.INTEGER_FAMILY_TYPE.ordinal());
System.out.println(keyList.get(0).isIntegerFamily());
Collections.sort(keyList);
ArrayList<String> strListAftereSort = new ArrayList<>();
@@ -93,8 +92,7 @@ public class SelfDefineSortableKeyTest {
System.out.println("test numbers:" + doubleList);
ArrayList<String> strNumList = listToStringList(doubleList);
//System.out.println("test num strs list:"+strNumList);
- ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList,
- (byte) SelfDefineSortableKey.TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
+ ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, (byte) SelfDefineSortableKey.TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
Collections.sort(keyList);
ArrayList<String> strListAftereSort = new ArrayList<>();
for (SelfDefineSortableKey key : keyList) {
@@ -123,8 +121,7 @@ public class SelfDefineSortableKeyTest {
strList.add("hello"); //duplicate
strList.add("123");
strList.add("");
- ArrayList<SelfDefineSortableKey> keyList = createKeyList(strList,
- (byte) SelfDefineSortableKey.TypeFlag.NONE_NUMERIC_TYPE.ordinal());
+ ArrayList<SelfDefineSortableKey> keyList = createKeyList(strList, (byte) SelfDefineSortableKey.TypeFlag.NONE_NUMERIC_TYPE.ordinal());
Collections.sort(keyList);
ArrayList<String> strListAftereSort = new ArrayList<>();
for (SelfDefineSortableKey key : keyList) {
@@ -154,16 +151,17 @@ public class SelfDefineSortableKeyTest {
doubleList.add(-Double.MAX_VALUE);
//System.out.println(Double.MIN_VALUE);
+
ArrayList<String> strNumList = listToStringList(doubleList);
//System.out.println("test num strs list:"+strNumList);
- ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList,
- (byte) SelfDefineSortableKey.TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
+ ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, (byte) SelfDefineSortableKey.TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
System.out.println("start to test str sort");
long start = System.currentTimeMillis();
Collections.sort(strNumList);
System.out.println("sort time : " + (System.currentTimeMillis() - start));
+
System.out.println("start to test double sort");
start = System.currentTimeMillis();
Collections.sort(keyList);
@@ -191,6 +189,7 @@ public class SelfDefineSortableKeyTest {
System.out.println("sort time : " + (System.currentTimeMillis() - start));
}
+
@Test
public void testIllegalNumber() {
Random rand = new Random(System.currentTimeMillis());
@@ -211,11 +210,10 @@ public class SelfDefineSortableKeyTest {
strNumList.add("fjaeif"); //illegal type
//System.out.println("test num strs list:"+strNumList);
try {
- ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList,
- (byte) SelfDefineSortableKey.TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
+ ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, (byte) SelfDefineSortableKey.TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
Collections.sort(keyList);
fail("Need catch exception");
- } catch (Exception e) {
+ }catch(Exception e){
//correct
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index cf6d0a8..66b154d 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -47,8 +47,7 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
sparkExecutable.setClassName(SparkCubingByLayer.class.getName());
sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
- sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(),
- seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName());
+ sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(), seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName());
sparkExecutable.setParam(SparkCubingByLayer.OPTION_CONF_PATH.getOpt(), KylinConfig.getKylinConfPath());
sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath);
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java
index ef39c69..6478c10 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java
@@ -37,8 +37,7 @@ import scala.Tuple2;
*/
public class SparkCountDemo extends AbstractApplication {
- private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true)
- .withDescription("Input path").create("input");
+ private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input");
private Options options;
@@ -57,29 +56,25 @@ public class SparkCountDemo extends AbstractApplication {
String logFile = "hdfs://10.249.65.231:8020/tmp/kylin.properties"; // Should be some file on your system
SparkConf conf = new SparkConf().setAppName("Simple Application");
JavaSparkContext sc = new JavaSparkContext(conf);
- final JavaPairRDD<String, Integer> logData = sc.textFile(logFile)
- .mapToPair(new PairFunction<String, String, Integer>() {
+ final JavaPairRDD<String, Integer> logData = sc.textFile(logFile).mapToPair(new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) throws Exception {
- return new Tuple2<String, Integer>(s, s.length());
- }
- }).sortByKey();
+ @Override
+ public Tuple2<String, Integer> call(String s) throws Exception {
+ return new Tuple2<String, Integer>(s, s.length());
+ }
+ }).sortByKey();
logData.persist(StorageLevel.MEMORY_AND_DISK_SER());
System.out.println("line number:" + logData.count());
logData.mapToPair(new PairFunction<Tuple2<String, Integer>, ImmutableBytesWritable, KeyValue>() {
@Override
- public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<String, Integer> stringIntegerTuple2)
- throws Exception {
+ public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
ImmutableBytesWritable key = new ImmutableBytesWritable(stringIntegerTuple2._1().getBytes());
- KeyValue value = new KeyValue(stringIntegerTuple2._1().getBytes(), "f".getBytes(), "c".getBytes(),
- String.valueOf(stringIntegerTuple2._2()).getBytes());
+ KeyValue value = new KeyValue(stringIntegerTuple2._1().getBytes(), "f".getBytes(), "c".getBytes(), String.valueOf(stringIntegerTuple2._2()).getBytes());
return new Tuple2(key, value);
}
- }).saveAsNewAPIHadoopFile("hdfs://10.249.65.231:8020/tmp/hfile", ImmutableBytesWritable.class, KeyValue.class,
- HFileOutputFormat.class);
+ }).saveAsNewAPIHadoopFile("hdfs://10.249.65.231:8020/tmp/hfile", ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index c3326ff..2a0981a 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -120,16 +120,11 @@ public class SparkCubing extends AbstractApplication {
protected static final Logger logger = LoggerFactory.getLogger(SparkCubing.class);
- private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true)
- .withDescription("Hive Intermediate Table").create("hiveTable");
- private static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
- .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
- private static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true)
- .withDescription("Cube Segment Id").create("segmentId");
- private static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true)
- .withDescription("Configuration Path").create("confPath");
- private static final Option OPTION_COPROCESSOR = OptionBuilder.withArgName("coprocessor").hasArg().isRequired(true)
- .withDescription("Coprocessor Jar Path").create("coprocessor");
+ private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable");
+ private static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+ private static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId");
+ private static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath");
+ private static final Option OPTION_COPROCESSOR = OptionBuilder.withArgName("coprocessor").hasArg().isRequired(true).withDescription("Coprocessor Jar Path").create("coprocessor");
private Options options;
@@ -192,10 +187,8 @@ public class SparkCubing extends AbstractApplication {
final CubeSegment seg = cubeInstance.getSegmentById(segmentId);
final CubeDesc cubeDesc = cubeInstance.getDescriptor();
final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
- final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(
- EngineFactory.getJoinedFlatTableDesc(seg), cubeDesc);
- final List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc))
- .getColumns();
+ final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(seg), cubeDesc);
+ final List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc)).getColumns();
final long start = System.currentTimeMillis();
final RowKeyDesc rowKey = cubeDesc.getRowkey();
for (int i = 0; i < baseCuboidColumn.size(); i++) {
@@ -214,36 +207,35 @@ public class SparkCubing extends AbstractApplication {
final DataFrame frame = intermediateTable.select(column).distinct();
final Row[] rows = frame.collect();
- dictionaryMap.put(tblColRef, DictionaryGenerator.buildDictionary(tblColRef.getType(),
- new IterableDictionaryValueEnumerator(new Iterable<String>() {
- @Override
- public Iterator<String> iterator() {
- return new Iterator<String>() {
- int i = 0;
+ dictionaryMap.put(tblColRef, DictionaryGenerator.buildDictionary(tblColRef.getType(), new IterableDictionaryValueEnumerator(new Iterable<String>() {
+ @Override
+ public Iterator<String> iterator() {
+ return new Iterator<String>() {
+ int i = 0;
- @Override
- public boolean hasNext() {
- return i < rows.length;
- }
+ @Override
+ public boolean hasNext() {
+ return i < rows.length;
+ }
- @Override
- public String next() {
- if (hasNext()) {
- final Row row = rows[i++];
- final Object o = row.get(0);
- return o != null ? o.toString() : null;
- } else {
- throw new NoSuchElementException();
- }
- }
+ @Override
+ public String next() {
+ if (hasNext()) {
+ final Row row = rows[i++];
+ final Object o = row.get(0);
+ return o != null ? o.toString() : null;
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
}
- })));
+ };
+ }
+ })));
}
final long end = System.currentTimeMillis();
CubingUtils.writeDictionary(seg, dictionaryMap, start, end);
@@ -256,8 +248,7 @@ public class SparkCubing extends AbstractApplication {
}
}
- private Map<Long, HLLCounter> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName,
- String segmentId) throws Exception {
+ private Map<Long, HLLCounter> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName, String segmentId) throws Exception {
CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
CubeDesc cubeDesc = cubeInstance.getDescriptor();
@@ -268,8 +259,7 @@ public class SparkCubing extends AbstractApplication {
zeroValue.put(id, new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()));
}
- CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(
- EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
+ CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
final int[] rowKeyColumnIndexes = flatDesc.getRowKeyColumnIndexes();
final int nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
@@ -295,56 +285,52 @@ public class SparkCubing extends AbstractApplication {
row_hashcodes[i] = new ByteArray();
}
- final HashMap<Long, HLLCounter> samplingResult = rowJavaRDD.aggregate(zeroValue,
- new Function2<HashMap<Long, HLLCounter>, List<String>, HashMap<Long, HLLCounter>>() {
+ final HashMap<Long, HLLCounter> samplingResult = rowJavaRDD.aggregate(zeroValue, new Function2<HashMap<Long, HLLCounter>, List<String>, HashMap<Long, HLLCounter>>() {
- final HashFunction hashFunction = Hashing.murmur3_128();
+ final HashFunction hashFunction = Hashing.murmur3_128();
- @Override
- public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, List<String> v2)
- throws Exception {
- for (int i = 0; i < nRowKey; i++) {
- Hasher hc = hashFunction.newHasher();
- String colValue = v2.get(rowKeyColumnIndexes[i]);
- if (colValue != null) {
- row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
- } else {
- row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
- }
- }
-
- for (Map.Entry<Long, Integer[]> entry : allCuboidsBitSet.entrySet()) {
- Hasher hc = hashFunction.newHasher();
- HLLCounter counter = v1.get(entry.getKey());
- final Integer[] cuboidBitSet = entry.getValue();
- for (int position = 0; position < cuboidBitSet.length; position++) {
- hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
- }
- counter.add(hc.hash().asBytes());
- }
- return v1;
+ @Override
+ public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, List<String> v2) throws Exception {
+ for (int i = 0; i < nRowKey; i++) {
+ Hasher hc = hashFunction.newHasher();
+ String colValue = v2.get(rowKeyColumnIndexes[i]);
+ if (colValue != null) {
+ row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
+ } else {
+ row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
}
- }, new Function2<HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>>() {
- @Override
- public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, HashMap<Long, HLLCounter> v2)
- throws Exception {
- Preconditions.checkArgument(v1.size() == v2.size());
- Preconditions.checkArgument(v1.size() > 0);
- for (Map.Entry<Long, HLLCounter> entry : v1.entrySet()) {
- final HLLCounter counter1 = entry.getValue();
- final HLLCounter counter2 = v2.get(entry.getKey());
- counter1.merge(Preconditions.checkNotNull(counter2, "counter cannot be null"));
- }
- return v1;
+ }
+
+ for (Map.Entry<Long, Integer[]> entry : allCuboidsBitSet.entrySet()) {
+ Hasher hc = hashFunction.newHasher();
+ HLLCounter counter = v1.get(entry.getKey());
+ final Integer[] cuboidBitSet = entry.getValue();
+ for (int position = 0; position < cuboidBitSet.length; position++) {
+ hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
}
+ counter.add(hc.hash().asBytes());
+ }
+ return v1;
+ }
+ }, new Function2<HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>>() {
+ @Override
+ public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, HashMap<Long, HLLCounter> v2) throws Exception {
+ Preconditions.checkArgument(v1.size() == v2.size());
+ Preconditions.checkArgument(v1.size() > 0);
+ for (Map.Entry<Long, HLLCounter> entry : v1.entrySet()) {
+ final HLLCounter counter1 = entry.getValue();
+ final HLLCounter counter2 = v2.get(entry.getKey());
+ counter1.merge(Preconditions.checkNotNull(counter2, "counter cannot be null"));
+ }
+ return v1;
+ }
- });
+ });
return samplingResult;
}
/** return hfile location */
- private String build(JavaRDD<List<String>> javaRDD, final String cubeName, final String segmentId,
- final byte[][] splitKeys) throws Exception {
+ private String build(JavaRDD<List<String>> javaRDD, final String cubeName, final String segmentId, final byte[][] splitKeys) throws Exception {
CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
CubeDesc cubeDesc = cubeInstance.getDescriptor();
final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
@@ -378,41 +364,35 @@ public class SparkCubing extends AbstractApplication {
}
}
- final JavaPairRDD<byte[], byte[]> javaPairRDD = javaRDD.glom()
- .mapPartitionsToPair(new PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() {
+ final JavaPairRDD<byte[], byte[]> javaPairRDD = javaRDD.glom().mapPartitionsToPair(new PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() {
- @Override
- public Iterable<Tuple2<byte[], byte[]>> call(Iterator<List<List<String>>> listIterator)
- throws Exception {
- long t = System.currentTimeMillis();
- prepare();
-
- final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv())
- .getCube(cubeName);
-
- LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue();
- System.out.println("load properties finished");
- IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment);
- AbstractInMemCubeBuilder inMemCubeBuilder = new DoggedCubeBuilder(cubeInstance.getDescriptor(),
- flatDesc, dictionaryMap);
- final SparkCuboidWriter sparkCuboidWriter = new BufferedCuboidWriter(
- new DefaultTupleConverter(cubeInstance.getSegmentById(segmentId), columnLengthMap));
- Executors.newCachedThreadPool()
- .submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, sparkCuboidWriter));
- try {
- while (listIterator.hasNext()) {
- for (List<String> row : listIterator.next()) {
- blockingQueue.put(row);
- }
- }
- blockingQueue.put(Collections.<String> emptyList());
- } catch (Exception e) {
- throw new RuntimeException(e);
+ @Override
+ public Iterable<Tuple2<byte[], byte[]>> call(Iterator<List<List<String>>> listIterator) throws Exception {
+ long t = System.currentTimeMillis();
+ prepare();
+
+ final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+
+ LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue();
+ System.out.println("load properties finished");
+ IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment);
+ AbstractInMemCubeBuilder inMemCubeBuilder = new DoggedCubeBuilder(cubeInstance.getDescriptor(), flatDesc, dictionaryMap);
+ final SparkCuboidWriter sparkCuboidWriter = new BufferedCuboidWriter(new DefaultTupleConverter(cubeInstance.getSegmentById(segmentId), columnLengthMap));
+ Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, sparkCuboidWriter));
+ try {
+ while (listIterator.hasNext()) {
+ for (List<String> row : listIterator.next()) {
+ blockingQueue.put(row);
}
- System.out.println("build partition cost: " + (System.currentTimeMillis() - t) + "ms");
- return sparkCuboidWriter.getResult();
}
- });
+ blockingQueue.put(Collections.<String> emptyList());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ System.out.println("build partition cost: " + (System.currentTimeMillis() - t) + "ms");
+ return sparkCuboidWriter.getResult();
+ }
+ });
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
Configuration conf = getConfigurationForHFile(cubeSegment.getStorageLocationIdentifier());
@@ -431,9 +411,7 @@ public class SparkCubing extends AbstractApplication {
return url;
}
- private void writeToHFile2(final JavaPairRDD<byte[], byte[]> javaPairRDD, final String[] dataTypes,
- final int measureSize, final MeasureAggregators aggs, final byte[][] splitKeys, final Configuration conf,
- final String hFileLocation) {
+ private void writeToHFile2(final JavaPairRDD<byte[], byte[]> javaPairRDD, final String[] dataTypes, final int measureSize, final MeasureAggregators aggs, final byte[][] splitKeys, final Configuration conf, final String hFileLocation) {
javaPairRDD.repartitionAndSortWithinPartitions(new Partitioner() {
@Override
public int numPartitions() {
@@ -450,52 +428,46 @@ public class SparkCubing extends AbstractApplication {
}
return splitKeys.length;
}
- }, UnsignedBytes.lexicographicalComparator())
- .mapPartitions(new FlatMapFunction<Iterator<Tuple2<byte[], byte[]>>, Tuple2<byte[], byte[]>>() {
- @Override
- public Iterable<Tuple2<byte[], byte[]>> call(final Iterator<Tuple2<byte[], byte[]>> tuple2Iterator)
- throws Exception {
- return new Iterable<Tuple2<byte[], byte[]>>() {
- final BufferedMeasureCodec codec = new BufferedMeasureCodec(dataTypes);
- final Object[] input = new Object[measureSize];
- final Object[] result = new Object[measureSize];
+ }, UnsignedBytes.lexicographicalComparator()).mapPartitions(new FlatMapFunction<Iterator<Tuple2<byte[], byte[]>>, Tuple2<byte[], byte[]>>() {
+ @Override
+ public Iterable<Tuple2<byte[], byte[]>> call(final Iterator<Tuple2<byte[], byte[]>> tuple2Iterator) throws Exception {
+ return new Iterable<Tuple2<byte[], byte[]>>() {
+ final BufferedMeasureCodec codec = new BufferedMeasureCodec(dataTypes);
+ final Object[] input = new Object[measureSize];
+ final Object[] result = new Object[measureSize];
+ @Override
+ public Iterator<Tuple2<byte[], byte[]>> iterator() {
+ return IteratorUtils.merge(tuple2Iterator, UnsignedBytes.lexicographicalComparator(), new Function<Iterable<byte[]>, byte[]>() {
@Override
- public Iterator<Tuple2<byte[], byte[]>> iterator() {
- return IteratorUtils.merge(tuple2Iterator, UnsignedBytes.lexicographicalComparator(),
- new Function<Iterable<byte[]>, byte[]>() {
- @Override
- public byte[] call(Iterable<byte[]> v1) throws Exception {
- final LinkedList<byte[]> list = Lists.newLinkedList(v1);
- if (list.size() == 1) {
- return list.get(0);
- }
- aggs.reset();
- for (byte[] v : list) {
- codec.decode(ByteBuffer.wrap(v), input);
- aggs.aggregate(input);
- }
- aggs.collectStates(result);
- ByteBuffer buffer = codec.encode(result);
- byte[] bytes = new byte[buffer.position()];
- System.arraycopy(buffer.array(), buffer.arrayOffset(), bytes, 0,
- buffer.position());
- return bytes;
- }
- });
+ public byte[] call(Iterable<byte[]> v1) throws Exception {
+ final LinkedList<byte[]> list = Lists.newLinkedList(v1);
+ if (list.size() == 1) {
+ return list.get(0);
+ }
+ aggs.reset();
+ for (byte[] v : list) {
+ codec.decode(ByteBuffer.wrap(v), input);
+ aggs.aggregate(input);
+ }
+ aggs.collectStates(result);
+ ByteBuffer buffer = codec.encode(result);
+ byte[] bytes = new byte[buffer.position()];
+ System.arraycopy(buffer.array(), buffer.arrayOffset(), bytes, 0, buffer.position());
+ return bytes;
}
- };
- }
- }, true).mapToPair(new PairFunction<Tuple2<byte[], byte[]>, ImmutableBytesWritable, KeyValue>() {
- @Override
- public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<byte[], byte[]> tuple2)
- throws Exception {
- ImmutableBytesWritable key = new ImmutableBytesWritable(tuple2._1());
- KeyValue value = new KeyValue(tuple2._1(), "F1".getBytes(), "M".getBytes(), tuple2._2());
- return new Tuple2(key, value);
+ });
}
- }).saveAsNewAPIHadoopFile(hFileLocation, ImmutableBytesWritable.class, KeyValue.class,
- HFileOutputFormat.class, conf);
+ };
+ }
+ }, true).mapToPair(new PairFunction<Tuple2<byte[], byte[]>, ImmutableBytesWritable, KeyValue>() {
+ @Override
+ public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<byte[], byte[]> tuple2) throws Exception {
+ ImmutableBytesWritable key = new ImmutableBytesWritable(tuple2._1());
+ KeyValue value = new KeyValue(tuple2._1(), "F1".getBytes(), "M".getBytes(), tuple2._2());
+ return new Tuple2(key, value);
+ }
+ }).saveAsNewAPIHadoopFile(hFileLocation, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class, conf);
}
public static void prepare() throws Exception {
@@ -506,16 +478,14 @@ public class SparkCubing extends AbstractApplication {
ClassUtil.addClasspath(confPath);
}
- private byte[][] createHTable(String cubeName, String segmentId, Map<Long, HLLCounter> samplingResult)
- throws Exception {
+ private byte[][] createHTable(String cubeName, String segmentId, Map<Long, HLLCounter> samplingResult) throws Exception {
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
final Map<Long, Long> rowCountMap = CubeStatsReader.getCuboidRowCountMapFromSampling(samplingResult, 100);
final Map<Long, Double> cubeSizeMap = CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSegment, rowCountMap);
System.out.println("cube size estimation:" + cubeSizeMap);
- final byte[][] splitKeys = CreateHTableJob.getRegionSplitsFromCuboidStatistics(cubeSizeMap, kylinConfig,
- cubeSegment, null); //FIXME: passing non-null value for 'hfileSplitsOutputFolder'
+ final byte[][] splitKeys = CreateHTableJob.getRegionSplitsFromCuboidStatistics(cubeSizeMap, kylinConfig, cubeSegment, null); //FIXME: passing non-null value for 'hfileSplitsOutputFolder'
CubeHTableUtil.createHTable(cubeSegment, splitKeys);
System.out.println(cubeSegment.getStorageLocationIdentifier() + " table created");
return splitKeys;
@@ -590,23 +560,22 @@ public class SparkCubing extends AbstractApplication {
setupClasspath(sc, confPath);
intermediateTable.cache();
writeDictionary(intermediateTable, cubeName, segmentId);
- final JavaRDD<List<String>> rowJavaRDD = intermediateTable.javaRDD()
- .map(new org.apache.spark.api.java.function.Function<Row, List<String>>() {
- @Override
- public List<String> call(Row v1) throws Exception {
- ArrayList<String> result = Lists.newArrayListWithExpectedSize(v1.size());
- for (int i = 0; i < v1.size(); i++) {
- final Object o = v1.get(i);
- if (o != null) {
- result.add(o.toString());
- } else {
- result.add(null);
- }
- }
- return result;
-
+ final JavaRDD<List<String>> rowJavaRDD = intermediateTable.javaRDD().map(new org.apache.spark.api.java.function.Function<Row, List<String>>() {
+ @Override
+ public List<String> call(Row v1) throws Exception {
+ ArrayList<String> result = Lists.newArrayListWithExpectedSize(v1.size());
+ for (int i = 0; i < v1.size(); i++) {
+ final Object o = v1.get(i);
+ if (o != null) {
+ result.add(o.toString());
+ } else {
+ result.add(null);
}
- });
+ }
+ return result;
+
+ }
+ });
final Map<Long, HLLCounter> samplingResult = sampling(rowJavaRDD, cubeName, segmentId);
final byte[][] splitKeys = createHTable(cubeName, segmentId, samplingResult);
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index cf2a650..f70fd30 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -17,14 +17,6 @@
*/
package org.apache.kylin.engine.spark;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -73,9 +65,17 @@ import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.Tuple2;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+
/**
* Spark application to build cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase.
*/
@@ -83,16 +83,11 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
protected static final Logger logger = LoggerFactory.getLogger(SparkCubingByLayer.class);
- public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
- .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
- public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true)
- .withDescription("Cube Segment Id").create("segmentId");
- public static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true)
- .withDescription("Configuration Path").create("confPath");
- public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
- .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
- public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
- .withDescription("Hive Intermediate Table").create("hiveTable");
+ public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+ public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId");
+ public static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath");
+ public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
+ public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable");
private Options options;
@@ -165,14 +160,12 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
final CubeDesc cubeDesc = cubeInstance.getDescriptor();
final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
- final CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(
- EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
+ final CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
final KylinConfig kylinConfig = cubeDesc.getConfig();
final Broadcast<CubeDesc> vCubeDesc = sc.broadcast(cubeDesc);
final Broadcast<CubeSegment> vCubeSegment = sc.broadcast(cubeSegment);
- final NDCuboidBuilder ndCuboidBuilder = new NDCuboidBuilder(vCubeSegment.getValue(),
- new RowKeyEncoderProvider(vCubeSegment.getValue()));
+ final NDCuboidBuilder ndCuboidBuilder = new NDCuboidBuilder(vCubeSegment.getValue(), new RowKeyEncoderProvider(vCubeSegment.getValue()));
final Broadcast<CuboidScheduler> vCuboidScheduler = sc.broadcast(new CuboidScheduler(vCubeDesc.getValue()));
final int measureNum = cubeDesc.getMeasures().size();
@@ -197,50 +190,45 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER();
// encode with dimension encoding, transform to <ByteArray, Object[]> RDD
- final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = intermediateTable.javaRDD()
- .mapToPair(new PairFunction<Row, ByteArray, Object[]>() {
- volatile transient boolean initialized = false;
- BaseCuboidBuilder baseCuboidBuilder = null;
+ final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = intermediateTable.javaRDD().mapToPair(new PairFunction<Row, ByteArray, Object[]>() {
+ volatile transient boolean initialized = false;
+ BaseCuboidBuilder baseCuboidBuilder = null;
- @Override
- public Tuple2<ByteArray, Object[]> call(Row row) throws Exception {
+ @Override
+ public Tuple2<ByteArray, Object[]> call(Row row) throws Exception {
+ if (initialized == false) {
+ synchronized (SparkCubingByLayer.class) {
if (initialized == false) {
- synchronized (SparkCubingByLayer.class) {
- if (initialized == false) {
- prepare();
- long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
- baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment,
- intermediateTableDesc,
- AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid),
- MeasureIngester.create(cubeDesc.getMeasures()),
- cubeSegment.buildDictionaryMap());
- initialized = true;
- }
- }
+ prepare();
+ long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
+ baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc, AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap());
+ initialized = true;
}
-
- String[] rowArray = rowToArray(row);
- baseCuboidBuilder.resetAggrs();
- byte[] rowKey = baseCuboidBuilder.buildKey(rowArray);
- Object[] result = baseCuboidBuilder.buildValueObjects(rowArray);
- return new Tuple2<>(new ByteArray(rowKey), result);
}
+ }
- private String[] rowToArray(Row row) {
- String[] result = new String[row.size()];
- for (int i = 0; i < row.size(); i++) {
- final Object o = row.get(i);
- if (o != null) {
- result[i] = o.toString();
- } else {
- result[i] = null;
- }
- }
- return result;
+ String[] rowArray = rowToArray(row);
+ baseCuboidBuilder.resetAggrs();
+ byte[] rowKey = baseCuboidBuilder.buildKey(rowArray);
+ Object[] result = baseCuboidBuilder.buildValueObjects(rowArray);
+ return new Tuple2<>(new ByteArray(rowKey), result);
+ }
+
+ private String[] rowToArray(Row row) {
+ String[] result = new String[row.size()];
+ for (int i = 0; i < row.size(); i++) {
+ final Object o = row.get(i);
+ if (o != null) {
+ result[i] = o.toString();
+ } else {
+ result[i] = null;
}
+ }
+ return result;
+ }
- });
+ });
logger.info("encodedBaseRDD partition number: " + encodedBaseRDD.getNumPartitions());
Long totalCount = 0L;
@@ -250,12 +238,10 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
}
final MeasureAggregators measureAggregators = new MeasureAggregators(cubeDesc.getMeasures());
- final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new BaseCuboidReducerFunction2(measureNum,
- vCubeDesc.getValue(), measureAggregators);
+ final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new BaseCuboidReducerFunction2(measureNum, vCubeDesc.getValue(), measureAggregators);
BaseCuboidReducerFunction2 reducerFunction2 = baseCuboidReducerFunction;
if (allNormalMeasure == false) {
- reducerFunction2 = new CuboidReducerFunction2(measureNum, vCubeDesc.getValue(), measureAggregators,
- needAggr);
+ reducerFunction2 = new CuboidReducerFunction2(measureNum, vCubeDesc.getValue(), measureAggregators, needAggr);
}
final int totalLevels = cubeDesc.getBuildLevel();
@@ -271,14 +257,12 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
saveToHDFS(allRDDs[0], vCubeDesc.getValue(), outputPath, 0, confOverwrite);
// aggregate to ND cuboids
- PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> flatMapFunction = new CuboidFlatMap(
- vCubeSegment.getValue(), vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder);
+ PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> flatMapFunction = new CuboidFlatMap(vCubeSegment.getValue(), vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder);
for (level = 1; level <= totalLevels; level++) {
partition = estimateRDDPartitionNum(level, cubeStatsReader, kylinConfig);
logger.info("Level " + level + " partition number: " + partition);
- allRDDs[level] = allRDDs[level - 1].flatMapToPair(flatMapFunction).reduceByKey(reducerFunction2, partition)
- .persist(storageLevel);
+ allRDDs[level] = allRDDs[level - 1].flatMapToPair(flatMapFunction).reduceByKey(reducerFunction2, partition).persist(storageLevel);
if (kylinConfig.isSparkSanityCheckEnabled() == true) {
sanityCheck(allRDDs[level], totalCount, level, cubeStatsReader, countMeasureIndex);
}
@@ -299,24 +283,19 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
return partition;
}
- private static void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final CubeDesc cubeDesc,
- final String hdfsBaseLocation, int level, Configuration conf) {
+ private static void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final CubeDesc cubeDesc, final String hdfsBaseLocation, int level, Configuration conf) {
final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
- rdd.mapToPair(
- new PairFunction<Tuple2<ByteArray, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
- BufferedMeasureCodec codec = new BufferedMeasureCodec(cubeDesc.getMeasures());
-
- @Override
- public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(
- Tuple2<ByteArray, Object[]> tuple2) throws Exception {
- ByteBuffer valueBuf = codec.encode(tuple2._2());
- byte[] encodedBytes = new byte[valueBuf.position()];
- System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
- return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2._1().array()),
- new org.apache.hadoop.io.Text(encodedBytes));
- }
- }).saveAsNewAPIHadoopFile(cuboidOutputPath, org.apache.hadoop.io.Text.class,
- org.apache.hadoop.io.Text.class, SequenceFileOutputFormat.class, conf);
+ rdd.mapToPair(new PairFunction<Tuple2<ByteArray, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
+ BufferedMeasureCodec codec = new BufferedMeasureCodec(cubeDesc.getMeasures());
+
+ @Override
+ public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception {
+ ByteBuffer valueBuf = codec.encode(tuple2._2());
+ byte[] encodedBytes = new byte[valueBuf.position()];
+ System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
+ return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2._1().array()), new org.apache.hadoop.io.Text(encodedBytes));
+ }
+ }).saveAsNewAPIHadoopFile(cuboidOutputPath, org.apache.hadoop.io.Text.class, org.apache.hadoop.io.Text.class, SequenceFileOutputFormat.class, conf);
logger.info("Persisting RDD for level " + level + " into " + cuboidOutputPath);
}
@@ -366,8 +345,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
RowKeySplitter rowKeySplitter;
transient boolean initialized = false;
- CuboidFlatMap(CubeSegment cubeSegment, CubeDesc cubeDesc, CuboidScheduler cuboidScheduler,
- NDCuboidBuilder ndCuboidBuilder) {
+ CuboidFlatMap(CubeSegment cubeSegment, CubeDesc cubeDesc, CuboidScheduler cuboidScheduler, NDCuboidBuilder ndCuboidBuilder) {
this.cubeSegment = cubeSegment;
this.cubeDesc = cubeDesc;
this.cuboidScheduler = cuboidScheduler;
@@ -396,8 +374,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
List<Tuple2<ByteArray, Object[]>> tuples = new ArrayList(myChildren.size());
for (Long child : myChildren) {
Cuboid childCuboid = Cuboid.findById(cubeDesc, child);
- Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid,
- rowKeySplitter.getSplitBuffers());
+ Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
byte[] newKey = new byte[result.getFirst()];
System.arraycopy(result.getSecond().array(), 0, newKey, 0, result.getFirst());
@@ -411,14 +388,11 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
//sanity check
- private void sanityCheck(JavaPairRDD<ByteArray, Object[]> rdd, Long totalCount, int thisLevel,
- CubeStatsReader cubeStatsReader, final int countMeasureIndex) {
+ private void sanityCheck(JavaPairRDD<ByteArray, Object[]> rdd, Long totalCount, int thisLevel, CubeStatsReader cubeStatsReader, final int countMeasureIndex) {
int thisCuboidNum = cubeStatsReader.getCuboidsByLayer(thisLevel).size();
Long count2 = getRDDCountSum(rdd, countMeasureIndex);
if (count2 != totalCount * thisCuboidNum) {
- throw new IllegalStateException(
- String.format("Sanity check failed, level %s, total count(*) is %s; cuboid number %s", thisLevel,
- count2, thisCuboidNum));
+ throw new IllegalStateException(String.format("Sanity check failed, level %s, total count(*) is %s; cuboid number %s", thisLevel, count2, thisCuboidNum));
} else {
logger.info("sanity check success for level " + thisLevel + ", count(*) is " + (count2 / thisCuboidNum));
}
@@ -433,8 +407,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
}
}).reduce(new Function2<Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>>() {
@Override
- public Tuple2<ByteArray, Long> call(Tuple2<ByteArray, Long> longTuple2, Tuple2<ByteArray, Long> longTuple22)
- throws Exception {
+ public Tuple2<ByteArray, Long> call(Tuple2<ByteArray, Long> longTuple2, Tuple2<ByteArray, Long> longTuple22) throws Exception {
return new Tuple2<>(ONE, longTuple2._2() + longTuple22._2());
}
})._2();
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index e05d63e..1ed2235 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -84,8 +84,7 @@ public class SparkExecutable extends AbstractExecutable {
hadoopConf = System.getProperty("kylin.hadoop.conf.dir");
if (StringUtils.isEmpty(hadoopConf)) {
- throw new RuntimeException(
- "kylin_hadoop_conf_dir is empty, check if there's error in the output of 'kylin.sh start'");
+ throw new RuntimeException("kylin_hadoop_conf_dir is empty, check if there's error in the output of 'kylin.sh start'");
}
File hiveConfFile = new File(hadoopConf, "hive-site.xml");
@@ -109,8 +108,7 @@ public class SparkExecutable extends AbstractExecutable {
}
StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.append(
- "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
+ stringBuilder.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
Map<String, String> sparkConfs = config.getSparkConfigOverride();
for (Map.Entry<String, String> entry : sparkConfs.entrySet()) {
@@ -119,8 +117,7 @@ public class SparkExecutable extends AbstractExecutable {
stringBuilder.append("--files %s --jars %s %s %s");
try {
- String cmd = String.format(stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(),
- hbaseConfFile.getAbsolutePath(), jars, jobJar, formatArgs());
+ String cmd = String.format(stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(), hbaseConfFile.getAbsolutePath(), jars, jobJar, formatArgs());
logger.info("cmd: " + cmd);
CliCommandExecutor exec = new CliCommandExecutor();
PatternedLogger patternedLogger = new PatternedLogger(logger);
@@ -133,4 +130,5 @@ public class SparkExecutable extends AbstractExecutable {
}
}
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/IteratorUtils.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/IteratorUtils.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/IteratorUtils.java
index 68ac1af..a8a4d28 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/IteratorUtils.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/IteratorUtils.java
@@ -30,8 +30,7 @@ import scala.Tuple2;
*/
public class IteratorUtils {
- public static <K, V> Iterator<Tuple2<K, V>> merge(final Iterator<Tuple2<K, V>> input,
- final Comparator<K> comparator, final Function<Iterable<V>, V> converter) {
+ public static <K, V> Iterator<Tuple2<K, V>> merge(final Iterator<Tuple2<K, V>> input, final Comparator<K> comparator, final Function<Iterable<V>, V> converter) {
return new Iterator<Tuple2<K, V>>() {
Tuple2<K, V> current = input.hasNext() ? input.next() : null;
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java b/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java
index 548a496..8afea55 100644
--- a/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java
+++ b/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java
@@ -38,8 +38,7 @@ public class BufferedCuboidWriterTest {
final BufferedCuboidWriter bufferedCuboidWriter = new BufferedCuboidWriter(new TupleConverter() {
@Override
public Tuple2<byte[], byte[]> convert(long cuboidId, GTRecord record) {
- return new Tuple2<>(Long.valueOf(cuboidId).toString().getBytes(),
- Long.valueOf(cuboidId).toString().getBytes());
+ return new Tuple2<>(Long.valueOf(cuboidId).toString().getBytes(), Long.valueOf(cuboidId).toString().getBytes());
}
});
final int testCount = 10000000;
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/KyroMappingGenerator.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/KyroMappingGenerator.java b/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/KyroMappingGenerator.java
index 095c041..b181d33 100644
--- a/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/KyroMappingGenerator.java
+++ b/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/KyroMappingGenerator.java
@@ -18,21 +18,20 @@
package org.apache.kylin.engine.spark.util;
-import java.io.Serializable;
-import java.util.Set;
-import java.util.TreeSet;
-
import org.apache.kylin.common.util.BytesSerializer;
import org.apache.kylin.measure.MeasureIngester;
import org.reflections.Reflections;
+import java.io.Serializable;
+import java.util.Set;
+import java.util.TreeSet;
+
/**
* Generate Kyro Registrator class, the output will be added into KylinKyroRegistrator manually. No runtime dependency with Reflections.
*/
public class KyroMappingGenerator {
public static void main(String[] args) {
- Set<Class<? extends Serializable>> subTypesOfSerializable = new Reflections("org.apache.kylin")
- .getSubTypesOf(Serializable.class);
+ Set<Class<? extends Serializable>> subTypesOfSerializable = new Reflections("org.apache.kylin").getSubTypesOf(Serializable.class);
String begin = "kyroClasses.add(";
String end = ".class);";
TreeSet<String> sortedSet = new TreeSet();
@@ -40,14 +39,12 @@ public class KyroMappingGenerator {
if (clazz.getCanonicalName() != null)
sortedSet.add(clazz.getCanonicalName());
}
- Set<Class<? extends BytesSerializer>> subTypesOfBytes = new Reflections("org.apache.kylin.metadata.datatype")
- .getSubTypesOf(BytesSerializer.class);
+ Set<Class<? extends BytesSerializer>> subTypesOfBytes = new Reflections("org.apache.kylin.metadata.datatype").getSubTypesOf(BytesSerializer.class);
for (Class clazz : subTypesOfBytes) {
if (clazz.getCanonicalName() != null)
sortedSet.add(clazz.getCanonicalName());
}
- Set<Class<? extends MeasureIngester>> subTypesOfMeasure = new Reflections("org.apache.kylin.measure")
- .getSubTypesOf(MeasureIngester.class);
+ Set<Class<? extends MeasureIngester>> subTypesOfMeasure = new Reflections("org.apache.kylin.measure").getSubTypesOf(MeasureIngester.class);
for (Class clazz : subTypesOfMeasure) {
if (clazz.getCanonicalName() != null)
sortedSet.add(clazz.getCanonicalName());
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/jdbc/src/main/java/org/apache/kylin/jdbc/Driver.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/Driver.java b/jdbc/src/main/java/org/apache/kylin/jdbc/Driver.java
index cfc4f5c..33d82f80 100644
--- a/jdbc/src/main/java/org/apache/kylin/jdbc/Driver.java
+++ b/jdbc/src/main/java/org/apache/kylin/jdbc/Driver.java
@@ -74,8 +74,7 @@ public class Driver extends UnregisteredDriver {
try {
DriverManager.registerDriver(new Driver());
} catch (SQLException e) {
- throw new RuntimeException(
- "Error occurred while registering JDBC driver " + Driver.class.getName() + ": " + e.toString());
+ throw new RuntimeException("Error occurred while registering JDBC driver " + Driver.class.getName() + ": " + e.toString());
}
}
@@ -86,8 +85,7 @@ public class Driver extends UnregisteredDriver {
@Override
protected DriverVersion createDriverVersion() {
- return DriverVersion.load(Driver.class, "org-apache-kylin-jdbc.properties", "Kylin JDBC Driver",
- "unknown version", "Kylin", "unknown version");
+ return DriverVersion.load(Driver.class, "org-apache-kylin-jdbc.properties", "Kylin JDBC Driver", "unknown version", "Kylin", "unknown version");
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/jdbc/src/main/java/org/apache/kylin/jdbc/IRemoteClient.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/IRemoteClient.java b/jdbc/src/main/java/org/apache/kylin/jdbc/IRemoteClient.java
index 56ce2b4..dfd8d76 100644
--- a/jdbc/src/main/java/org/apache/kylin/jdbc/IRemoteClient.java
+++ b/jdbc/src/main/java/org/apache/kylin/jdbc/IRemoteClient.java
@@ -52,7 +52,6 @@ public interface IRemoteClient extends Closeable {
/**
* Execute query remotely and get back result.
*/
- public QueryResult executeQuery(String sql, List<AvaticaParameter> params, List<Object> paramValues,
- Map<String, String> queryToggles) throws IOException;
+ public QueryResult executeQuery(String sql, List<AvaticaParameter> params, List<Object> paramValues, Map<String, String> queryToggles) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/jdbc/src/main/java/org/apache/kylin/jdbc/KylinClient.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinClient.java b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinClient.java
index 0d43f8d..86c3a5b 100644
--- a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinClient.java
+++ b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinClient.java
@@ -85,8 +85,7 @@ public class KylinClient implements IRemoteClient {
if (isSSL()) {
try {
SSLSocketFactory sslsf = new SSLSocketFactory(new TrustStrategy() {
- public boolean isTrusted(final X509Certificate[] chain, String authType)
- throws CertificateException {
+ public boolean isTrusted(final X509Certificate[] chain, String authType) throws CertificateException {
// Oh, I am easy...
return true;
}
@@ -251,9 +250,8 @@ public class KylinClient implements IRemoteClient {
throw asIOException(get, response);
}
- List<TableMetaStub> tableMetaStubs = jsonMapper.readValue(response.getEntity().getContent(),
- new TypeReference<List<TableMetaStub>>() {
- });
+ List<TableMetaStub> tableMetaStubs = jsonMapper.readValue(response.getEntity().getContent(), new TypeReference<List<TableMetaStub>>() {
+ });
List<KMetaTable> tables = convertMetaTables(tableMetaStubs);
List<KMetaSchema> schemas = convertMetaSchemas(tables);
@@ -315,21 +313,15 @@ public class KylinClient implements IRemoteClient {
for (ColumnMetaStub columnStub : tableStub.getColumns()) {
columns.add(convertMetaColumn(columnStub));
}
- return new KMetaTable(tableStub.getTABLE_CAT(), tableStub.getTABLE_SCHEM(), tableStub.getTABLE_NAME(),
- tableStub.getTABLE_TYPE(), columns);
+ return new KMetaTable(tableStub.getTABLE_CAT(), tableStub.getTABLE_SCHEM(), tableStub.getTABLE_NAME(), tableStub.getTABLE_TYPE(), columns);
}
private KMetaColumn convertMetaColumn(ColumnMetaStub columnStub) {
- return new KMetaColumn(columnStub.getTABLE_CAT(), columnStub.getTABLE_SCHEM(), columnStub.getTABLE_NAME(),
- columnStub.getCOLUMN_NAME(), columnStub.getDATA_TYPE(), columnStub.getTYPE_NAME(),
- columnStub.getCOLUMN_SIZE(), columnStub.getDECIMAL_DIGITS(), columnStub.getNUM_PREC_RADIX(),
- columnStub.getNULLABLE(), columnStub.getCHAR_OCTET_LENGTH(), columnStub.getORDINAL_POSITION(),
- columnStub.getIS_NULLABLE());
+ return new KMetaColumn(columnStub.getTABLE_CAT(), columnStub.getTABLE_SCHEM(), columnStub.getTABLE_NAME(), columnStub.getCOLUMN_NAME(), columnStub.getDATA_TYPE(), columnStub.getTYPE_NAME(), columnStub.getCOLUMN_SIZE(), columnStub.getDECIMAL_DIGITS(), columnStub.getNUM_PREC_RADIX(), columnStub.getNULLABLE(), columnStub.getCHAR_OCTET_LENGTH(), columnStub.getORDINAL_POSITION(), columnStub.getIS_NULLABLE());
}
@Override
- public QueryResult executeQuery(String sql, List<AvaticaParameter> params, List<Object> paramValues,
- Map<String, String> queryToggles) throws IOException {
+ public QueryResult executeQuery(String sql, List<AvaticaParameter> params, List<Object> paramValues, Map<String, String> queryToggles) throws IOException {
SQLResponseStub queryResp = executeKylinQuery(sql, convertParameters(params, paramValues), queryToggles);
if (queryResp.getIsException())
@@ -354,8 +346,7 @@ public class KylinClient implements IRemoteClient {
return result;
}
- private SQLResponseStub executeKylinQuery(String sql, List<StatementParameter> params,
- Map<String, String> queryToggles) throws IOException {
+ private SQLResponseStub executeKylinQuery(String sql, List<StatementParameter> params, Map<String, String> queryToggles) throws IOException {
String url = baseUrl() + "/kylin/api/query";
String project = conn.getProject();
@@ -397,11 +388,7 @@ public class KylinClient implements IRemoteClient {
Class columnClass = convertType(scm.getColumnType());
ScalarType type = ColumnMetaData.scalar(scm.getColumnType(), scm.getColumnTypeName(), Rep.of(columnClass));
- ColumnMetaData meta = new ColumnMetaData(i, scm.isAutoIncrement(), scm.isCaseSensitive(),
- scm.isSearchable(), scm.isCurrency(), scm.getIsNullable(), scm.isSigned(), scm.getDisplaySize(),
- scm.getLabel(), scm.getName(), scm.getSchemaName(), scm.getPrecision(), scm.getScale(),
- scm.getTableName(), scm.getSchemaName(), type, scm.isReadOnly(), scm.isWritable(), scm.isWritable(),
- columnClass.getCanonicalName());
+ ColumnMetaData meta = new ColumnMetaData(i, scm.isAutoIncrement(), scm.isCaseSensitive(), scm.isSearchable(), scm.isCurrency(), scm.getIsNullable(), scm.isSigned(), scm.getDisplaySize(), scm.getLabel(), scm.getName(), scm.getSchemaName(), scm.getPrecision(), scm.getScale(), scm.getTableName(), scm.getSchemaName(), type, scm.isReadOnly(), scm.isWritable(), scm.isWritable(), columnClass.getCanonicalName());
metas.add(meta);
}
@@ -426,8 +413,7 @@ public class KylinClient implements IRemoteClient {
}
private IOException asIOException(HttpRequestBase request, HttpResponse response) throws IOException {
- return new IOException(request.getMethod() + " failed, error code " + response.getStatusLine().getStatusCode()
- + " and response: " + EntityUtils.toString(response.getEntity()));
+ return new IOException(request.getMethod() + " failed, error code " + response.getStatusLine().getStatusCode() + " and response: " + EntityUtils.toString(response.getEntity()));
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnection.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnection.java b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnection.java
index 7fd09d6..6852998 100644
--- a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnection.java
+++ b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinConnection.java
@@ -47,8 +47,7 @@ public class KylinConnection extends AvaticaConnection {
private final String project;
private final IRemoteClient remoteClient;
- protected KylinConnection(UnregisteredDriver driver, KylinJdbcFactory factory, String url, Properties info)
- throws SQLException {
+ protected KylinConnection(UnregisteredDriver driver, KylinJdbcFactory factory, String url, Properties info) throws SQLException {
super(driver, factory, url, info);
String odbcUrl = url;
@@ -84,8 +83,7 @@ public class KylinConnection extends AvaticaConnection {
}
@Override
- public AvaticaStatement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
- throws SQLException {
+ public AvaticaStatement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return super.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
}
@@ -102,11 +100,9 @@ public class KylinConnection extends AvaticaConnection {
}
@Override
- public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency,
- int resultSetHoldability) throws SQLException {
+ public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
Meta.Signature sig = mockPreparedSignature(sql);
- return factory().newPreparedStatement(this, null, sig, resultSetType, resultSetConcurrency,
- resultSetHoldability);
+ return factory().newPreparedStatement(this, null, sig, resultSetType, resultSetConcurrency, resultSetHoldability);
}
// TODO add restful API to prepare SQL, get back expected ResultSetMetaData
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbcFactory.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbcFactory.java b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbcFactory.java
index 32bf6ca..6aae983 100644
--- a/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbcFactory.java
+++ b/jdbc/src/main/java/org/apache/kylin/jdbc/KylinJdbcFactory.java
@@ -73,8 +73,7 @@ public class KylinJdbcFactory implements AvaticaFactory {
}
@Override
- public AvaticaConnection newConnection(UnregisteredDriver driver, AvaticaFactory factory, String url,
- Properties info) throws SQLException {
+ public AvaticaConnection newConnection(UnregisteredDriver driver, AvaticaFactory factory, String url, Properties info) throws SQLException {
return new KylinConnection(driver, (KylinJdbcFactory) factory, url, info);
}
@@ -85,23 +84,17 @@ public class KylinJdbcFactory implements AvaticaFactory {
}
@Override
- public AvaticaStatement newStatement(AvaticaConnection connection, StatementHandle h, int resultSetType,
- int resultSetConcurrency, int resultSetHoldability) throws SQLException {
- return new KylinStatement((KylinConnection) connection, h, resultSetType, resultSetConcurrency,
- resultSetHoldability);
+ public AvaticaStatement newStatement(AvaticaConnection connection, StatementHandle h, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return new KylinStatement((KylinConnection) connection, h, resultSetType, resultSetConcurrency, resultSetHoldability);
}
@Override
- public AvaticaPreparedStatement newPreparedStatement(AvaticaConnection connection, StatementHandle h,
- Signature signature, int resultSetType, int resultSetConcurrency, int resultSetHoldability)
- throws SQLException {
- return new KylinPreparedStatement((KylinConnection) connection, h, signature, resultSetType,
- resultSetConcurrency, resultSetHoldability);
+ public AvaticaPreparedStatement newPreparedStatement(AvaticaConnection connection, StatementHandle h, Signature signature, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return new KylinPreparedStatement((KylinConnection) connection, h, signature, resultSetType, resultSetConcurrency, resultSetHoldability);
}
@Override
- public AvaticaResultSet newResultSet(AvaticaStatement statement, QueryState state, Signature signature,
- TimeZone timeZone, Frame firstFrame) throws SQLException {
+ public AvaticaResultSet newResultSet(AvaticaStatement statement, QueryState state, Signature signature, TimeZone timeZone, Frame firstFrame) throws SQLException {
AvaticaResultSetMetaData resultSetMetaData = new AvaticaResultSetMetaData(statement, null, signature);
return new KylinResultSet(statement, state, signature, resultSetMetaData, timeZone, firstFrame);
}