You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metamodel.apache.org by ka...@apache.org on 2014/03/24 22:16:58 UTC

[9/9] git commit: METAMODEL-42: Applied patch for HBase GET support

METAMODEL-42: Applied patch for HBase GET support

Project: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/commit/a51ff94a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/tree/a51ff94a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/diff/a51ff94a

Branch: refs/heads/hbase-module
Commit: a51ff94a727548263984c23e445f1711ff88b5e6
Parents: e2c3d3f
Author: Tomasz Guzialek <To...@humaninference.com>
Authored: Mon Mar 24 22:15:39 2014 +0100
Committer: Kasper Sørensen <i....@gmail.com>
Committed: Mon Mar 24 22:15:39 2014 +0100

----------------------------------------------------------------------
 .../metamodel/QueryPostprocessDataContext.java  | 87 +++++++++++++++-----
 .../QueryPostprocessDataContextTest.java        | 46 ++++++++++-
 .../metamodel/hbase/HBaseDataContext.java       | 48 ++++++++++-
 .../metamodel/hbase/HBaseDataContextTest.java   | 19 ++++-
 4 files changed, 175 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/a51ff94a/core/src/main/java/org/apache/metamodel/QueryPostprocessDataContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/metamodel/QueryPostprocessDataContext.java b/core/src/main/java/org/apache/metamodel/QueryPostprocessDataContext.java
index 66565df..1791365 100644
--- a/core/src/main/java/org/apache/metamodel/QueryPostprocessDataContext.java
+++ b/core/src/main/java/org/apache/metamodel/QueryPostprocessDataContext.java
@@ -96,28 +96,58 @@ public abstract class QueryPostprocessDataContext extends AbstractDataContext im
         final List<FilterItem> havingItems = query.getHavingClause().getItems();
         final List<OrderByItem> orderByItems = query.getOrderByClause().getItems();
 
-        // check for approximate SELECT COUNT(*) queries
-        if (fromItems.size() == 1 && selectItems.size() == 1 && groupByItems.isEmpty() && havingItems.isEmpty()) {
-            final SelectItem selectItem = query.getSelectClause().getItem(0);
-            if (SelectItem.isCountAllItem(selectItem)) {
-                final boolean functionApproximationAllowed = selectItem.isFunctionApproximationAllowed();
-                final FromItem fromItem = query.getFromClause().getItem(0);
-                final Table table = fromItem.getTable();
-                if (table != null) {
-                    if (isMainSchemaTable(table)) {
-                        logger.debug("Query is a COUNT query with {} where items. Trying executeCountQuery(...)",
-                                whereItems.size());
-                        final Number count = executeCountQuery(table, whereItems, functionApproximationAllowed);
-                        if (count == null) {
-                            logger.debug("DataContext did not return any count query results. Proceeding with manual counting.");
-                        } else {
-                            List<Row> data = new ArrayList<Row>(1);
-                            final DataSetHeader header = new SimpleDataSetHeader(new SelectItem[] { selectItem });
-                            data.add(new DefaultRow(header, new Object[] { count }));
-                            return new InMemoryDataSet(header, data);
+        // check certain common query types that can often be optimized by
+        // subclasses
+        if (fromItems.size() == 1 && groupByItems.isEmpty() && havingItems.isEmpty()) {
+
+            final FromItem fromItem = query.getFromClause().getItem(0);
+            final Table table = fromItem.getTable();
+            if (table != null) {
+                // check for approximate SELECT COUNT(*) queries
+                if (selectItems.size() == 1) {
+                    final SelectItem selectItem = query.getSelectClause().getItem(0);
+                    if (SelectItem.isCountAllItem(selectItem)) {
+                        final boolean functionApproximationAllowed = selectItem.isFunctionApproximationAllowed();
+                        if (isMainSchemaTable(table)) {
+                            logger.debug("Query is a COUNT query with {} where items. Trying executeCountQuery(...)",
+                                    whereItems.size());
+                            final Number count = executeCountQuery(table, whereItems, functionApproximationAllowed);
+                            if (count == null) {
+                                logger.debug("DataContext did not return any count query results. Proceeding with manual counting.");
+                            } else {
+                                List<Row> data = new ArrayList<Row>(1);
+                                final DataSetHeader header = new SimpleDataSetHeader(new SelectItem[] { selectItem });
+                                data.add(new DefaultRow(header, new Object[] { count }));
+                                return new InMemoryDataSet(header, data);
+                            }
+                        }
+                    }
+                }
+
+                // check for lookup query by primary key
+                if (whereItems.size() == 1) {
+                    final FilterItem whereItem = whereItems.get(0);
+                    final SelectItem selectItem = whereItem.getSelectItem();
+                    if (!whereItem.isCompoundFilter() && selectItem != null && selectItem.getColumn() != null) {
+                        final Column column = selectItem.getColumn();
+                        if (column.isPrimaryKey() && whereItem.getOperator() == OperatorType.EQUALS_TO) {
+                            logger.debug("Query is a primary key lookup query. Trying executePrimaryKeyLookupQuery(...)");
+                            if (table != null) {
+                                if (isMainSchemaTable(table)) {
+                                    final Object operand = whereItem.getOperand();
+                                    final Row row = executePrimaryKeyLookupQuery(table, selectItems, operand);
+                                    if (row == null) {
+                                        logger.debug("DataContext did not return any GET query results. Proceeding with manual lookup.");
+                                    } else {
+                                        final DataSetHeader header = new SimpleDataSetHeader(selectItems);
+                                        return new InMemoryDataSet(header, row);
+                                    }
+                                }
+                            }
                         }
                     }
                 }
+
             }
         }
 
@@ -206,6 +236,25 @@ public abstract class QueryPostprocessDataContext extends AbstractDataContext im
         return null;
     }
 
+    /**
+     * Executes a query which obtains a row by primary key (as defined by
+     * {@link Column#isPrimaryKey()}). This method is provided to allow
+     * subclasses to optimize lookup queries since they are quite common and
+     * often a datastore can retrieve the row using some specialized means which
+     * is much more performant than scanning all records manually.
+     * 
+     * @param table
+     *            the table on which the lookup is requested.
+     * @param selectItems
+     *            the items to select from the lookup query.
+     * @param keyValue
+     *            the primary key value that is specified in the lookup query.
+     * @return the row if the particular table, or null if not available.
+     */
+    protected Row executePrimaryKeyLookupQuery(Table table, List<SelectItem> selectItems, Object keyValue) {
+        return null;
+    }
+
     protected DataSet materializeFromItem(final FromItem fromItem, final List<SelectItem> selectItems) {
         DataSet dataSet;
         JoinType joinType = fromItem.getJoin();

http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/a51ff94a/core/src/test/java/org/apache/metamodel/QueryPostprocessDataContextTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/metamodel/QueryPostprocessDataContextTest.java b/core/src/test/java/org/apache/metamodel/QueryPostprocessDataContextTest.java
index 3933855..163af0c 100644
--- a/core/src/test/java/org/apache/metamodel/QueryPostprocessDataContextTest.java
+++ b/core/src/test/java/org/apache/metamodel/QueryPostprocessDataContextTest.java
@@ -27,7 +27,9 @@ import javax.swing.table.TableModel;
 
 import org.apache.metamodel.data.DataSet;
 import org.apache.metamodel.data.DataSetTableModel;
+import org.apache.metamodel.data.DefaultRow;
 import org.apache.metamodel.data.Row;
+import org.apache.metamodel.data.SimpleDataSetHeader;
 import org.apache.metamodel.query.CompiledQuery;
 import org.apache.metamodel.query.FilterItem;
 import org.apache.metamodel.query.FromItem;
@@ -570,7 +572,7 @@ public class QueryPostprocessDataContextTest extends MetaModelTestCase {
         }
         assertFalse(data.next());
     }
-    
+
     public void testJoinAndFirstRow() throws Exception {
         DataSet data;
 
@@ -582,7 +584,7 @@ public class QueryPostprocessDataContextTest extends MetaModelTestCase {
         q.select(table2.getColumns());
         data = dc.executeQuery(q);
         assertEquals(48, data.toObjectArrays().size());
-        
+
         q.setFirstRow(3);
         data = dc.executeQuery(q);
         assertEquals(46, data.toObjectArrays().size());
@@ -858,4 +860,44 @@ public class QueryPostprocessDataContextTest extends MetaModelTestCase {
         assertEquals("Row[values=[1337]]", ds.getRow().toString());
         assertFalse(ds.next());
     }
+
+    public void testExecutePrimaryKeyLookupQuery() throws Exception {
+        QueryPostprocessDataContext dc = new QueryPostprocessDataContext() {
+            @Override
+            protected DataSet materializeMainSchemaTable(Table table, Column[] columns, int maxRows) {
+                throw new UnsupportedAddressTypeException();
+            }
+
+            @Override
+            protected Number executeCountQuery(Table table, List<FilterItem> whereItems,
+                    boolean functionApproximationAllowed) {
+                return null;
+            }
+
+            @Override
+            protected String getMainSchemaName() throws MetaModelException {
+                return "sch";
+            }
+
+            @Override
+            protected Row executePrimaryKeyLookupQuery(Table table, List<SelectItem> selectItems, Object keyValue) {
+                assertEquals("foo", keyValue);
+                return new DefaultRow(new SimpleDataSetHeader(selectItems), new Object[] { "hello world" });
+            }
+
+            @Override
+            protected Schema getMainSchema() throws MetaModelException {
+                MutableSchema schema = new MutableSchema(getMainSchemaName());
+                MutableTable table = new MutableTable("tabl").setSchema(schema);
+                table.addColumn(new MutableColumn("col1").setTable(table).setPrimaryKey(true));
+                table.addColumn(new MutableColumn("col2").setTable(table));
+                return schema.addTable(table);
+            }
+        };
+
+        DataSet result = dc.query().from("tabl").select("col2").where("col1").eq("foo").execute();
+        assertTrue(result.next());
+        assertEquals("Row[values=[hello world]]", result.getRow().toString());
+        assertFalse(result.next());
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/a51ff94a/hbase/src/main/java/org/apache/metamodel/hbase/HBaseDataContext.java
----------------------------------------------------------------------
diff --git a/hbase/src/main/java/org/apache/metamodel/hbase/HBaseDataContext.java b/hbase/src/main/java/org/apache/metamodel/hbase/HBaseDataContext.java
index b4da5d0..78d3e33 100644
--- a/hbase/src/main/java/org/apache/metamodel/hbase/HBaseDataContext.java
+++ b/hbase/src/main/java/org/apache/metamodel/hbase/HBaseDataContext.java
@@ -19,32 +19,44 @@
 package org.apache.metamodel.hbase;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.HTablePool;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.metamodel.DataContext;
 import org.apache.metamodel.MetaModelException;
 import org.apache.metamodel.QueryPostprocessDataContext;
 import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.DataSetHeader;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.data.SimpleDataSetHeader;
 import org.apache.metamodel.query.FilterItem;
+import org.apache.metamodel.query.SelectItem;
 import org.apache.metamodel.schema.Column;
 import org.apache.metamodel.schema.MutableSchema;
 import org.apache.metamodel.schema.Schema;
 import org.apache.metamodel.schema.Table;
 import org.apache.metamodel.util.FileHelper;
 import org.apache.metamodel.util.SimpleTableDef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * MetaModel adaptor for Apache HBase.
  */
 public class HBaseDataContext extends QueryPostprocessDataContext {
 
+    private static final Logger logger = LoggerFactory.getLogger(HBaseDataContext.class);
+
     public static final String FIELD_ID = "_id";
 
     private final HBaseConfiguration _configuration;
@@ -159,7 +171,7 @@ public class HBaseDataContext extends QueryPostprocessDataContext {
             try {
                 while (scanner.next() != null) {
                     result++;
-                }                
+                }
             } finally {
                 scanner.close();
             }
@@ -170,6 +182,22 @@ public class HBaseDataContext extends QueryPostprocessDataContext {
     }
 
     @Override
+    protected Row executePrimaryKeyLookupQuery(Table table, List<SelectItem> selectItems, Object keyValue) {
+        HTableInterface hTable = _tablePool.getTable(table.getName());
+        Get get = new Get(ByteUtils.toBytes(keyValue));
+        try {
+            Result result = hTable.get(get);
+            DataSetHeader header = new SimpleDataSetHeader(selectItems);
+            Row row = new HBaseRow(header, result);
+            return row;
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed to execute HBase get operation with key: " + keyValue, e);
+        } finally {
+            FileHelper.safeClose(hTable);
+        }
+    }
+
+    @Override
     protected DataSet materializeMainSchemaTable(Table table, Column[] columns, int maxRows) {
         final Scan scan = new Scan();
         for (Column column : columns) {
@@ -184,7 +212,9 @@ public class HBaseDataContext extends QueryPostprocessDataContext {
             }
         }
 
-        scan.setMaxResultSize(maxRows);
+        if (maxRows > 0) {
+            setMaxRows(scan, maxRows);
+        }
 
         final HTableInterface hTable = _tablePool.getTable(table.getName());
         try {
@@ -196,4 +226,18 @@ public class HBaseDataContext extends QueryPostprocessDataContext {
         }
     }
 
+    private void setMaxRows(Scan scan, int maxRows) {
+        try {
+            // in old versions of the HBase API, the 'setMaxResultSize' method
+            // is not available
+            Method method = scan.getClass().getMethod("setMaxResultSize", long.class);
+            method.invoke(scan, (long) maxRows);
+            logger.debug("Succesfully set maxRows using Scan.setMaxResultSize({})", maxRows);
+        } catch (Exception e) {
+            logger.debug(
+                    "HBase API does not have Scan.setMaxResultSize(long) method, setting maxRows using PageFilter.", e);
+            scan.setFilter(new PageFilter(maxRows));
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/a51ff94a/hbase/src/test/java/org/apache/metamodel/hbase/HBaseDataContextTest.java
----------------------------------------------------------------------
diff --git a/hbase/src/test/java/org/apache/metamodel/hbase/HBaseDataContextTest.java b/hbase/src/test/java/org/apache/metamodel/hbase/HBaseDataContextTest.java
index 0782231..055149e 100644
--- a/hbase/src/test/java/org/apache/metamodel/hbase/HBaseDataContextTest.java
+++ b/hbase/src/test/java/org/apache/metamodel/hbase/HBaseDataContextTest.java
@@ -107,8 +107,9 @@ public class HBaseDataContextTest extends TestCase {
         }
 
         // query only id
-        final DataSet dataSet4 = _dataContext.query().from(EXAMPLE_TABLE_NAME)
-                .select(HBaseDataContext.FIELD_ID).execute();
+        final DataSet dataSet4 = _dataContext.query().from(EXAMPLE_TABLE_NAME).select(HBaseDataContext.FIELD_ID)
+                .execute();
+
         try {
             assertTrue(dataSet4.next());
             assertEquals("Row[values=[junit1]]", dataSet4.getRow().toString());
@@ -118,6 +119,20 @@ public class HBaseDataContextTest extends TestCase {
         } finally {
             dataSet4.close();
         }
+
+        // primary key lookup query - using GET
+        final DataSet dataSet5 = _dataContext.query().from(EXAMPLE_TABLE_NAME).select(HBaseDataContext.FIELD_ID)
+                .where(HBaseDataContext.FIELD_ID).eq("junit1").execute();
+
+        try {
+            assertTrue(dataSet5.next());
+            assertEquals("Row[values=[junit1]]", dataSet5.getRow().toString());
+            assertFalse(dataSet5.next());
+        } finally {
+            dataSet5.close();
+        }
+
+        // TODO: Check if really GET was used instead of SCAN
     }
 
     private void insertRecordsNatively() throws Exception {