You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/10 03:57:43 UTC

incubator-kylin git commit: KYLIN-738 make hconnection pool more robust by checking if closed

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 247d4f13c -> cf4277891


KYLIN-738 make hconnection pool more robust by checking if closed


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/cf427789
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/cf427789
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/cf427789

Branch: refs/heads/0.8.0
Commit: cf4277891db49cfcdec2615ab43465759e8aad00
Parents: 247d4f1
Author: honma <ho...@ebay.com>
Authored: Wed Jun 10 09:57:24 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Jun 10 09:57:24 2015 +0800

----------------------------------------------------------------------
 .../common/persistence/HBaseConnection.java     | 18 +++--
 .../kylin/job/tools/HbaseStreamingInput.java    | 21 +++---
 .../metadata/filter/DateConditionModifier.java  | 73 --------------------
 .../filter/TimeConditionLiteralsReplacer.java   | 70 +++++++++++++++++++
 .../kylin/query/enumerator/OLAPEnumerator.java  | 12 ++--
 .../filter/DateConditionModifierTest.java       | 32 ---------
 .../TimeConditionLiteralsReplacerTest.java      | 32 +++++++++
 7 files changed, 135 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cf427789/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
index ffc066a..a574d0b 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
@@ -70,11 +70,21 @@ public class HBaseConnection {
 
         HConnection connection = ConnPool.get(url);
         try {
-            // I don't use DCL since recreate a connection is not a big issue.
-            if (connection == null) {
-                connection = HConnectionManager.createConnection(conf);
-                ConnPool.put(url, connection);
+            while (true) {
+                // I don't use DCL since recreate a connection is not a big issue.
+                if (connection == null || connection.isClosed()) {
+                    logger.info("connection is null or closed, creating a new one");
+                    connection = HConnectionManager.createConnection(conf);
+                    ConnPool.put(url, connection);
+                }
+
+                if (connection == null || connection.isClosed()) {
+                    Thread.sleep(10000);// wait a while and retry
+                } else {
+                    break;
+                }
             }
+
         } catch (Throwable t) {
             logger.error("Error when open connection " + url, t);
             throw new StorageException("Error when open connection " + url, t);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cf427789/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java b/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
index e38a574..70086b3 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/HbaseStreamingInput.java
@@ -1,5 +1,13 @@
 package org.apache.kylin.job.tools;
 
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
+import org.apache.kylin.common.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
@@ -9,15 +17,6 @@ import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Semaphore;
 
-import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
-import org.apache.kylin.common.util.Bytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
 /**
  */
 public class HbaseStreamingInput {
@@ -28,7 +27,8 @@ public class HbaseStreamingInput {
     private static final byte[] QN = "C".getBytes();
 
     public static void createTable(String tableName) throws IOException {
-        HBaseAdmin hadmin = new HBaseAdmin(getConnection());
+        HConnection conn = getConnection();
+        HBaseAdmin hadmin = new HBaseAdmin(conn);
 
         try {
             boolean tableExist = hadmin.tableExists(tableName);
@@ -49,6 +49,7 @@ public class HbaseStreamingInput {
 
             logger.info("HTable '" + tableName + "' created");
         } finally {
+            conn.close();
             hadmin.close();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cf427789/metadata/src/main/java/org/apache/kylin/metadata/filter/DateConditionModifier.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/DateConditionModifier.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/DateConditionModifier.java
deleted file mode 100644
index 08be3de..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/filter/DateConditionModifier.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package org.apache.kylin.metadata.filter;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import java.util.Collection;
-import java.util.IdentityHashMap;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Calcite passed down all time family constants as GregorianCalendar,
- * we'll have to reformat it to date/time according to column definition
- */
-public class DateConditionModifier implements TupleFilterSerializer.Decorator {
-
-    private IdentityHashMap<TupleFilter, DataType> dateCompareTupleChildren;
-
-    public DateConditionModifier(TupleFilter root) {
-        this.dateCompareTupleChildren = Maps.newIdentityHashMap();
-    }
-
-    @Override
-    public TupleFilter onSerialize(TupleFilter filter) {
-
-        if (filter instanceof CompareTupleFilter) {
-            CompareTupleFilter cfilter = (CompareTupleFilter) filter;
-            List<? extends TupleFilter> children = cfilter.getChildren();
-
-            if (children == null || children.size() < 1) {
-                throw new IllegalArgumentException("Illegal compare filter: " + cfilter);
-            }
-
-            TblColRef col = cfilter.getColumn();
-            if (col == null || !col.getType().isDateTimeFamily()) {
-                return cfilter;
-            }
-
-            for (TupleFilter child : filter.getChildren()) {
-                dateCompareTupleChildren.put(child, col.getType());
-            }
-        }
-
-        if (filter instanceof ConstantTupleFilter && dateCompareTupleChildren.containsKey(filter)) {
-            ConstantTupleFilter constantTupleFilter = (ConstantTupleFilter) filter;
-            Set<String> newValues = Sets.newHashSet();
-            DataType columnType = dateCompareTupleChildren.get(filter);
-
-            for (String value : (Collection<String>) constantTupleFilter.getValues()) {
-                newValues.add(formatTime(Long.valueOf(value), columnType));
-            }
-            return new ConstantTupleFilter(newValues);
-        }
-        return filter;
-    }
-
-    private String formatTime(long millis, DataType dataType) {
-        if (dataType.isDatetime() || dataType.isTime()) {
-            throw new RuntimeException("Datetime and time type are not supported yet");
-        }
-
-        if (dataType.isTimestamp()) {
-            return DateFormat.formatToTimeStr(millis);
-        } else if (dataType.isDate()) {
-            return DateFormat.formatToDateStr(millis);
-        } else {
-            throw new RuntimeException("Unknown type " + dataType + " to formatTime");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cf427789/metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java b/metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
new file mode 100644
index 0000000..391768a
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
@@ -0,0 +1,70 @@
+package org.apache.kylin.metadata.filter;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.util.Collection;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Set;
+
+
+public class TimeConditionLiteralsReplacer implements TupleFilterSerializer.Decorator {
+
+    private IdentityHashMap<TupleFilter, DataType> dateCompareTupleChildren;
+
+    public TimeConditionLiteralsReplacer(TupleFilter root) {
+        this.dateCompareTupleChildren = Maps.newIdentityHashMap();
+    }
+
+    @Override
+    public TupleFilter onSerialize(TupleFilter filter) {
+
+        if (filter instanceof CompareTupleFilter) {
+            CompareTupleFilter cfilter = (CompareTupleFilter) filter;
+            List<? extends TupleFilter> children = cfilter.getChildren();
+
+            if (children == null || children.size() < 1) {
+                throw new IllegalArgumentException("Illegal compare filter: " + cfilter);
+            }
+
+            TblColRef col = cfilter.getColumn();
+            if (col == null || !col.getType().isDateTimeFamily()) {
+                return cfilter;
+            }
+
+            for (TupleFilter child : filter.getChildren()) {
+                dateCompareTupleChildren.put(child, col.getType());
+            }
+        }
+
+        if (filter instanceof ConstantTupleFilter && dateCompareTupleChildren.containsKey(filter)) {
+            ConstantTupleFilter constantTupleFilter = (ConstantTupleFilter) filter;
+            Set<String> newValues = Sets.newHashSet();
+            DataType columnType = dateCompareTupleChildren.get(filter);
+
+            for (String value : (Collection<String>) constantTupleFilter.getValues()) {
+                newValues.add(formatTime(Long.valueOf(value), columnType));
+            }
+            return new ConstantTupleFilter(newValues);
+        }
+        return filter;
+    }
+
+    private String formatTime(long millis, DataType dataType) {
+        if (dataType.isDatetime() || dataType.isTime()) {
+            throw new RuntimeException("Datetime and time type are not supported yet");
+        }
+
+        if (dataType.isTimestamp()) {
+            return DateFormat.formatToTimeStr(millis);
+        } else if (dataType.isDate()) {
+            return DateFormat.formatToDateStr(millis);
+        } else {
+            throw new RuntimeException("Unknown type " + dataType + " to formatTime");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cf427789/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
index 63f06f1..768e7da 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
@@ -22,7 +22,7 @@ import net.hydromatic.linq4j.Enumerator;
 import net.hydromatic.optiq.DataContext;
 import net.hydromatic.optiq.jdbc.OptiqConnection;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.DateConditionModifier;
+import org.apache.kylin.metadata.filter.TimeConditionLiteralsReplacer;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilterSerializer;
 import org.apache.kylin.metadata.tuple.ITuple;
@@ -108,7 +108,7 @@ public class OLAPEnumerator implements Enumerator<Object[]> {
         bindVariable(olapContext.filter);
 
         //modify date condition
-        olapContext.filter = modifyDateCondition(olapContext.filter);
+        olapContext.filter = modifyTimeLiterals(olapContext.filter);
         olapContext.resetSQLDigest();
 
         // query storage engine
@@ -121,8 +121,12 @@ public class OLAPEnumerator implements Enumerator<Object[]> {
         return iterator;
     }
 
-    private TupleFilter modifyDateCondition(TupleFilter filter) {
-        DateConditionModifier filterDecorator = new DateConditionModifier(filter);
+    /**
+     * Calcite passed down all time family constants as GregorianCalendar,
+     * we'll have to reformat it to date/time according to column definition
+     */
+    private TupleFilter modifyTimeLiterals(TupleFilter filter) {
+        TimeConditionLiteralsReplacer filterDecorator = new TimeConditionLiteralsReplacer(filter);
         byte[] bytes = TupleFilterSerializer.serialize(filter, filterDecorator, DictCodeSystem.INSTANCE);
         return TupleFilterSerializer.deserialize(bytes, DictCodeSystem.INSTANCE);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cf427789/storage/src/test/java/org/apache/kylin/storage/filter/DateConditionModifierTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/filter/DateConditionModifierTest.java b/storage/src/test/java/org/apache/kylin/storage/filter/DateConditionModifierTest.java
deleted file mode 100644
index 669289e..0000000
--- a/storage/src/test/java/org/apache/kylin/storage/filter/DateConditionModifierTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package org.apache.kylin.storage.filter;
-
-import org.apache.kylin.metadata.filter.*;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.hbase.coprocessor.DictCodeSystem;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- */
-public class DateConditionModifierTest extends FilterBaseTest {
-    @Test
-    public void basicTest() {
-        TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
-        ColumnDesc c1 = ColumnDesc.mockup(t1, 2, "CAL_DT", "date");
-        TblColRef column = new TblColRef(c1);
-
-        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
-        ColumnTupleFilter columnFilter = new ColumnTupleFilter(column);
-        compareFilter.addChild(columnFilter);
-        ConstantTupleFilter constantFilter = null;
-        constantFilter = new ConstantTupleFilter("946684800000");
-        compareFilter.addChild(constantFilter);
-
-        DateConditionModifier filterDecorator = new DateConditionModifier(compareFilter);
-        byte[] bytes = TupleFilterSerializer.serialize(compareFilter, filterDecorator, DictCodeSystem.INSTANCE);
-        CompareTupleFilter compareTupleFilter = (CompareTupleFilter) TupleFilterSerializer.deserialize(bytes, DictCodeSystem.INSTANCE);
-        Assert.assertEquals("2000-01-01", compareTupleFilter.getFirstValue());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/cf427789/storage/src/test/java/org/apache/kylin/storage/filter/TimeConditionLiteralsReplacerTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/filter/TimeConditionLiteralsReplacerTest.java b/storage/src/test/java/org/apache/kylin/storage/filter/TimeConditionLiteralsReplacerTest.java
new file mode 100644
index 0000000..dcbf02c
--- /dev/null
+++ b/storage/src/test/java/org/apache/kylin/storage/filter/TimeConditionLiteralsReplacerTest.java
@@ -0,0 +1,32 @@
+package org.apache.kylin.storage.filter;
+
+import org.apache.kylin.metadata.filter.*;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.hbase.coprocessor.DictCodeSystem;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ */
+public class TimeConditionLiteralsReplacerTest extends FilterBaseTest {
+    @Test
+    public void basicTest() {
+        TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT");
+        ColumnDesc c1 = ColumnDesc.mockup(t1, 2, "CAL_DT", "date");
+        TblColRef column = new TblColRef(c1);
+
+        CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
+        ColumnTupleFilter columnFilter = new ColumnTupleFilter(column);
+        compareFilter.addChild(columnFilter);
+        ConstantTupleFilter constantFilter = null;
+        constantFilter = new ConstantTupleFilter("946684800000");
+        compareFilter.addChild(constantFilter);
+
+        TimeConditionLiteralsReplacer filterDecorator = new TimeConditionLiteralsReplacer(compareFilter);
+        byte[] bytes = TupleFilterSerializer.serialize(compareFilter, filterDecorator, DictCodeSystem.INSTANCE);
+        CompareTupleFilter compareTupleFilter = (CompareTupleFilter) TupleFilterSerializer.deserialize(bytes, DictCodeSystem.INSTANCE);
+        Assert.assertEquals("2000-01-01", compareTupleFilter.getFirstValue());
+    }
+}