You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/10 03:57:43 UTC
incubator-kylin git commit: KYLIN-738 make hconnection pool more
robust by checking if closed
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 247d4f13c -> cf4277891
KYLIN-738 make hconnection pool more robust by checking if closed
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/cf427789
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/cf427789
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/cf427789
Branch: refs/heads/0.8.0
Commit: cf4277891db49cfcdec2615ab43465759e8aad00
Parents: 247d4f1
Author: honma <ho...@ebay.com>
Authored: Wed Jun 10 09:57:24 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Jun 10 09:57:24 2015 +0800
----------------------------------------------------------------------
.../common/persistence/HBaseConnection.java | 18 +++--
.../kylin/job/tools/HbaseStreamingInput.java | 21 +++---
.../metadata/filter/DateConditionModifier.java | 73 --------------------
.../filter/TimeConditionLiteralsReplacer.java | 70 +++++++++++++++++++
.../kylin/query/enumerator/OLAPEnumerator.java | 12 ++--
.../filter/DateConditionModifierTest.java | 32 ---------
.../TimeConditionLiteralsReplacerTest.java | 32 +++++++++
7 files changed, 135 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cf427789/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
index ffc066a..a574d0b 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
@@ -70,11 +70,21 @@ public class HBaseConnection {
HConnection connection = ConnPool.get(url);
try {
- // I don't use DCL since recreate a connection is not a big issue.
- if (connection == null) {
- connection = HConnectionManager.createConnection(conf);
- ConnPool.put(url, connection);
+ while (true) {
+ // I don't use DCL since recreate a connection is not a big issue.
+ if (connection == null || connection.isClosed()) {
+ logger.info("connection is null or closed, creating a new one");
+ connection = HConnectionManager.createConnection(conf);
+ ConnPool.put(url, connection);
+ }
+
+ if (connection == null || connection.isClosed()) {
+ Thread.sleep(10000);// wait a while and retry
+ } else {
+ break;
+ }
}
+
} catch (Throwable t) {
logger.error("Error when open connection " + url, t);
throw new StorageException("Error when open connection " + url, t);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cf427789/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java b/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
index e38a574..70086b3 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
@@ -1,5 +1,13 @@
package org.apache.kylin.job.tools;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
+import org.apache.kylin.common.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
@@ -9,15 +17,6 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.Semaphore;
-import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
-import org.apache.kylin.common.util.Bytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
/**
*/
public class HbaseStreamingInput {
@@ -28,7 +27,8 @@ public class HbaseStreamingInput {
private static final byte[] QN = "C".getBytes();
public static void createTable(String tableName) throws IOException {
- HBaseAdmin hadmin = new HBaseAdmin(getConnection());
+ HConnection conn = getConnection();
+ HBaseAdmin hadmin = new HBaseAdmin(conn);
try {
boolean tableExist = hadmin.tableExists(tableName);
@@ -49,6 +49,7 @@ public class HbaseStreamingInput {
logger.info("HTable '" + tableName + "' created");
} finally {
+ conn.close();
hadmin.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cf427789/metadata/src/main/java/org/apache/kylin/metadata/filter/DateConditionModifier.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/DateConditionModifier.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/DateConditionModifier.java
deleted file mode 100644
index 08be3de..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/DateConditionModifier.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package org.apache.kylin.metadata.filter;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import java.util.Collection;
-import java.util.IdentityHashMap;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Calcite passed down all time family constants as GregorianCalendar,
- * we'll have to reformat it to date/time according to column definition
- */
-public class DateConditionModifier implements TupleFilterSerializer.Decorator {
-
- private IdentityHashMap<TupleFilter, DataType> dateCompareTupleChildren;
-
- public DateConditionModifier(TupleFilter root) {
- this.dateCompareTupleChildren = Maps.newIdentityHashMap();
- }
-
- @Override
- public TupleFilter onSerialize(TupleFilter filter) {
-
- if (filter instanceof CompareTupleFilter) {
- CompareTupleFilter cfilter = (CompareTupleFilter) filter;
- List<? extends TupleFilter> children = cfilter.getChildren();
-
- if (children == null || children.size() < 1) {
- throw new IllegalArgumentException("Illegal compare filter: " + cfilter);
- }
-
- TblColRef col = cfilter.getColumn();
- if (col == null || !col.getType().isDateTimeFamily()) {
- return cfilter;
- }
-
- for (TupleFilter child : filter.getChildren()) {
- dateCompareTupleChildren.put(child, col.getType());
- }
- }
-
- if (filter instanceof ConstantTupleFilter && dateCompareTupleChildren.containsKey(filter)) {
- ConstantTupleFilter constantTupleFilter = (ConstantTupleFilter) filter;
- Set<String> newValues = Sets.newHashSet();
- DataType columnType = dateCompareTupleChildren.get(filter);
-
- for (String value : (Collection<String>) constantTupleFilter.getValues()) {
- newValues.add(formatTime(Long.valueOf(value), columnType));
- }
- return new ConstantTupleFilter(newValues);
- }
- return filter;
- }
-
- private String formatTime(long millis, DataType dataType) {
- if (dataType.isDatetime() || dataType.isTime()) {
- throw new RuntimeException("Datetime and time type are not supported yet");
- }
-
- if (dataType.isTimestamp()) {
- return DateFormat.formatToTimeStr(millis);
- } else if (dataType.isDate()) {
- return DateFormat.formatToDateStr(millis);
- } else {
- throw new RuntimeException("Unknown type " + dataType + " to formatTime");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cf427789/metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
new file mode 100644
index 0000000..391768a
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
@@ -0,0 +1,70 @@
+package org.apache.kylin.metadata.filter;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.util.Collection;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Set;
+
+
+public class TimeConditionLiteralsReplacer implements TupleFilterSerializer.Decorator {
+
+ private IdentityHashMap<TupleFilter, DataType> dateCompareTupleChildren;
+
+ public TimeConditionLiteralsReplacer(TupleFilter root) {
+ this.dateCompareTupleChildren = Maps.newIdentityHashMap();
+ }
+
+ @Override
+ public TupleFilter onSerialize(TupleFilter filter) {
+
+ if (filter instanceof CompareTupleFilter) {
+ CompareTupleFilter cfilter = (CompareTupleFilter) filter;
+ List<? extends TupleFilter> children = cfilter.getChildren();
+
+ if (children == null || children.size() < 1) {
+ throw new IllegalArgumentException("Illegal compare filter: " + cfilter);
+ }
+
+ TblColRef col = cfilter.getColumn();
+ if (col == null || !col.getType().isDateTimeFamily()) {
+ return cfilter;
+ }
+
+ for (TupleFilter child : filter.getChildren()) {
+ dateCompareTupleChildren.put(child, col.getType());
+ }
+ }
+
+ if (filter instanceof ConstantTupleFilter && dateCompareTupleChildren.containsKey(filter)) {
+ ConstantTupleFilter constantTupleFilter = (ConstantTupleFilter) filter;
+ Set<String> newValues = Sets.newHashSet();
+ DataType columnType = dateCompareTupleChildren.get(filter);
+
+ for (String value : (Collection<String>) constantTupleFilter.getValues()) {
+ newValues.add(formatTime(Long.valueOf(value), columnType));
+ }
+ return new ConstantTupleFilter(newValues);
+ }
+ return filter;
+ }
+
+ private String formatTime(long millis, DataType dataType) {
+ if (dataType.isDatetime() || dataType.isTime()) {
+ throw new RuntimeException("Datetime and time type are not supported yet");
+ }
+
+ if (dataType.isTimestamp()) {
+ return DateFormat.formatToTimeStr(millis);
+ } else if (dataType.isDate()) {
+ return DateFormat.formatToDateStr(millis);
+ } else {
+ throw new RuntimeException("Unknown type " + dataType + " to formatTime");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cf427789/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
index 63f06f1..768e7da 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
@@ -22,7 +22,7 @@ import net.hydromatic.linq4j.Enumerator;
import net.hydromatic.optiq.DataContext;
import net.hydromatic.optiq.jdbc.OptiqConnection;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.DateConditionModifier;
+import org.apache.kylin.metadata.filter.TimeConditionLiteralsReplacer;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.filter.TupleFilterSerializer;
import org.apache.kylin.metadata.tuple.ITuple;
@@ -108,7 +108,7 @@ public class OLAPEnumerator implements Enumerator<Object[]> {
bindVariable(olapContext.filter);
//modify date condition
- olapContext.filter = modifyDateCondition(olapContext.filter);
+ olapContext.filter = modifyTimeLiterals(olapContext.filter);
olapContext.resetSQLDigest();
// query storage engine
@@ -121,8 +121,12 @@ public class OLAPEnumerator implements Enumerator<Object[]> {
return iterator;
}
- private TupleFilter modifyDateCondition(TupleFilter filter) {
- DateConditionModifier filterDecorator = new DateConditionModifier(filter);
+ /**
+ * Calcite passed down all time family constants as GregorianCalendar,
+ * we'll have to reformat it to date/time according to column definition
+ */
+ private TupleFilter modifyTimeLiterals(TupleFilter filter) {
+ TimeConditionLiteralsReplacer filterDecorator = new TimeConditionLiteralsReplacer(filter);
byte[] bytes = TupleFilterSerializer.serialize(filter, filterDecorator, DictCodeSystem.INSTANCE);
return TupleFilterSerializer.deserialize(bytes, DictCodeSystem.INSTANCE);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cf427789/storage/src/test/java/org/apache/kylin/storage/filter/DateConditionModifierTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/filter/DateConditionModifierTest.java b/storage/src/test/java/org/apache/kylin/storage/filter/DateConditionModifierTest.java
deleted file mode 100644
index 669289e..0000000
--- a/storage/src/test/java/org/apache/kylin/storage/filter/DateConditionModifierTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package org.apache.kylin.storage.filter;
-
-import org.apache.kylin.metadata.filter.*;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.hbase.coprocessor.DictCodeSystem;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- */
-public class DateConditionModifierTest extends FilterBaseTest {
- @Test
- public void basicTest() {
- TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
- ColumnDesc c1 = ColumnDesc.mockup(t1, 2, "CAL_DT", "date");
- TblColRef column = new TblColRef(c1);
-
- CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
- ColumnTupleFilter columnFilter = new ColumnTupleFilter(column);
- compareFilter.addChild(columnFilter);
- ConstantTupleFilter constantFilter = null;
- constantFilter = new ConstantTupleFilter("946684800000");
- compareFilter.addChild(constantFilter);
-
- DateConditionModifier filterDecorator = new DateConditionModifier(compareFilter);
- byte[] bytes = TupleFilterSerializer.serialize(compareFilter, filterDecorator, DictCodeSystem.INSTANCE);
- CompareTupleFilter compareTupleFilter = (CompareTupleFilter) TupleFilterSerializer.deserialize(bytes, DictCodeSystem.INSTANCE);
- Assert.assertEquals("2000-01-01", compareTupleFilter.getFirstValue());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cf427789/storage/src/test/java/org/apache/kylin/storage/filter/TimeConditionLiteralsReplacerTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/filter/TimeConditionLiteralsReplacerTest.java b/storage/src/test/java/org/apache/kylin/storage/filter/TimeConditionLiteralsReplacerTest.java
new file mode 100644
index 0000000..dcbf02c
--- /dev/null
+++ b/storage/src/test/java/org/apache/kylin/storage/filter/TimeConditionLiteralsReplacerTest.java
@@ -0,0 +1,32 @@
+package org.apache.kylin.storage.filter;
+
+import org.apache.kylin.metadata.filter.*;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.hbase.coprocessor.DictCodeSystem;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ */
+public class TimeConditionLiteralsReplacerTest extends FilterBaseTest {
+ @Test
+ public void basicTest() {
+ TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
+ ColumnDesc c1 = ColumnDesc.mockup(t1, 2, "CAL_DT", "date");
+ TblColRef column = new TblColRef(c1);
+
+ CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
+ ColumnTupleFilter columnFilter = new ColumnTupleFilter(column);
+ compareFilter.addChild(columnFilter);
+ ConstantTupleFilter constantFilter = null;
+ constantFilter = new ConstantTupleFilter("946684800000");
+ compareFilter.addChild(constantFilter);
+
+ TimeConditionLiteralsReplacer filterDecorator = new TimeConditionLiteralsReplacer(compareFilter);
+ byte[] bytes = TupleFilterSerializer.serialize(compareFilter, filterDecorator, DictCodeSystem.INSTANCE);
+ CompareTupleFilter compareTupleFilter = (CompareTupleFilter) TupleFilterSerializer.deserialize(bytes, DictCodeSystem.INSTANCE);
+ Assert.assertEquals("2000-01-01", compareTupleFilter.getFirstValue());
+ }
+}