You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2009/09/01 01:03:33 UTC

svn commit: r809778 - in /hadoop/hive/trunk: ./ contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/ contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/ contrib/src/test/queries/clientpositive/ contrib/src/test/results/clientposi...

Author: namit
Date: Mon Aug 31 23:03:33 2009
New Revision: 809778

URL: http://svn.apache.org/viewvc?rev=809778&view=rev
Log:
HIVE-645. UDF to run JDBC
(Edward Capriolo via namit)


Added:
    hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/
    hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/
    hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFDBOutput.java
    hadoop/hive/trunk/contrib/src/test/queries/clientpositive/dboutput.q
    hadoop/hive/trunk/contrib/src/test/results/clientpositive/dboutput.q.out
Modified:
    hadoop/hive/trunk/CHANGES.txt

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=809778&r1=809777&r2=809778&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Mon Aug 31 23:03:33 2009
@@ -30,6 +30,9 @@
     HIVE-792. Support "add archive" in addition to "add file" and "add jar".
     (He Yongqiang via zshao)
 
+    HIVE-645. UDF to run JDBC
+    (Edward Capriolo via namit)
+
   IMPROVEMENTS
 
     HIVE-760. Add version info to META-INF/MANIFEST.MF.

Added: hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFDBOutput.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFDBOutput.java?rev=809778&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFDBOutput.java (added)
+++ hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFDBOutput.java Mon Aug 31 23:03:33 2009
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.contrib.genericudf.example;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.hadoop.hive.ql.udf.generic.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.io.IntWritable;
+
+/**
+* GenericUDFDBOutput is designed to output data directly from Hive to a JDBC datastore. 
+* This UDF is useful for exporting small to medium summaries that have a unique key.
+* 
+* Due to the nature of hadoop, individual mappers, reducers or entire jobs can fail. 
+* If a failure occurs a mapper or reducer may be retried. This UDF has no way of 
+* detecting failures or rolling back a transaction. Consequently, you should only 
+* only use this to export to a table with a unique key. The unique key should safeguard 
+* against duplicate data.
+* 
+* Use hive's ADD JAR feature to add your JDBC Driver to the distributed cache,
+* otherwise GenericUDFDBoutput will fail.
+*/
+public class GenericUDFDBOutput extends GenericUDF {
+  private static Log LOG = LogFactory.getLog(GenericUDFDBOutput.class.getName());
+
+  ObjectInspector[] argumentOI;
+  GenericUDFUtils.ReturnObjectInspectorResolver returnOIResolver;
+  Connection connection = null;
+  private String url;
+  private String user;
+  private String pass;
+  private IntWritable result = new IntWritable(-1);
+  /**
+  * @param arguments 
+  * argument 0 is the JDBC connection string
+  * argument 1 is the user name
+  * argument 2 is the password
+  * argument 3 is an SQL query to be used in the PreparedStatement
+  * argument (4-n) The remaining arguments must be primitive and are passed to the PreparedStatement object
+  */
+  public ObjectInspector initialize(ObjectInspector[] arguments)
+      throws UDFArgumentTypeException {
+    this.argumentOI = arguments;
+
+    //this should be connection url,username,password,query,column1[,columnn]*
+    for (int i=0;i<4;i++){
+      if ( arguments[i].getCategory() == ObjectInspector.Category.PRIMITIVE) {
+        PrimitiveObjectInspector poi = ((PrimitiveObjectInspector)arguments[i]);
+        
+        if (! (poi.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING)){
+          throw new UDFArgumentTypeException(i,
+            "The argument of function  should be \"" + Constants.STRING_TYPE_NAME
+            + "\", but \"" + arguments[i].getTypeName() + "\" is found");
+        }
+      }
+    }
+    for (int i=4;i<arguments.length;i++){
+      if ( arguments[i].getCategory() != ObjectInspector.Category.PRIMITIVE) {
+        throw new UDFArgumentTypeException(i,
+            "The argument of function should be primative" + 
+             ", but \"" + arguments[i].getTypeName() + "\" is found");
+      }
+    }
+    
+    return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
+  }
+
+  /**
+  * @return 0 on success -1 on failure
+  */
+  public Object evaluate(DeferredObject[] arguments) throws HiveException {
+    
+    url = ((StringObjectInspector)argumentOI[0]).getPrimitiveJavaObject(arguments[0].get());
+    user = ((StringObjectInspector)argumentOI[1]).getPrimitiveJavaObject(arguments[1].get()) ;
+    pass = ((StringObjectInspector)argumentOI[2]).getPrimitiveJavaObject(arguments[2].get()) ;
+
+    try {   
+      connection = DriverManager.getConnection(url, user, pass);
+    } catch (SQLException ex) { 
+      LOG.error("Driver loading or connection issue", ex);
+      result.set(2);
+    }
+    
+    if (connection != null){
+      try {
+
+        PreparedStatement ps = connection.prepareStatement(
+        ((StringObjectInspector)argumentOI[3]).getPrimitiveJavaObject(arguments[3].get()) );
+        for (int i = 4; i < arguments.length; ++i) {
+          PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) argumentOI[i]);
+          ps.setObject(i - 3, poi.getPrimitiveJavaObject(arguments[i].get()));
+        }
+        ps.execute();
+        ps.close();
+        result.set(0);
+      } catch (SQLException e) {
+        LOG.error("Underlying SQL exception", e);
+        result.set(1);
+      } finally {
+        try {
+          connection.close();
+        } catch (Exception ex) {
+          LOG.error("Underlying SQL exception during close", ex);
+        }
+      }
+    } 
+
+    return result;
+  }
+
+  
+  public String getDisplayString(String[] children) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("dboutput(");
+    if (children.length > 0) {
+      sb.append(children[0]);
+      for(int i=1; i<children.length; i++) {
+        sb.append(",");
+        sb.append(children[i]);
+      }
+    }
+    sb.append(")");
+    return sb.toString();
+  }
+
+}
+

Added: hadoop/hive/trunk/contrib/src/test/queries/clientpositive/dboutput.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/test/queries/clientpositive/dboutput.q?rev=809778&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/test/queries/clientpositive/dboutput.q (added)
+++ hadoop/hive/trunk/contrib/src/test/queries/clientpositive/dboutput.q Mon Aug 31 23:03:33 2009
@@ -0,0 +1,57 @@
+add jar ../build/contrib/hive_contrib.jar;
+
+CREATE TEMPORARY FUNCTION dboutput AS 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput';
+
+set mapred.map.tasks.speculative.execution=false;
+set mapred.reduce.tasks.speculative.execution=false;
+set mapred.map.tasks=1;
+set mapred.reduce.tasks=1;
+
+ADD JAR ../lib/derby.jar ;
+
+EXPLAIN FROM src
+
+SELECT dboutput ( 'jdbc:derby:../build/test_dboutput_db\;create=true','','',
+'CREATE TABLE app_info ( kkey VARCHAR(255) NOT NULL, vvalue VARCHAR(255) NOT NULL, UNIQUE(kkey))' ),
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)','20','a'),
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)','20','b')
+
+limit 1;
+
+
+FROM src 
+
+SELECT dboutput ( 'jdbc:derby:../build/test_dboutput_db\;create=true','','',
+'CREATE TABLE app_info ( kkey INTEGER NOT NULL, vvalue VARCHAR(255) NOT NULL, UNIQUE(kkey))' ),
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)','20','a'),
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)','20','b')
+
+limit 1;
+
+EXPLAIN SELECT
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)',key,value)
+
+FROM src WHERE key < 10;
+
+
+SELECT
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)',key,value)
+
+FROM src WHERE key < 10;
+
+dfs -rmr ../build/test_dboutput_db;
+dfs -rmr derby.log;
+
+DROP TEMPORARY FUNCTION dboutput;

Added: hadoop/hive/trunk/contrib/src/test/results/clientpositive/dboutput.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/test/results/clientpositive/dboutput.q.out?rev=809778&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/test/results/clientpositive/dboutput.q.out (added)
+++ hadoop/hive/trunk/contrib/src/test/results/clientpositive/dboutput.q.out Mon Aug 31 23:03:33 2009
@@ -0,0 +1,130 @@
+query: CREATE TEMPORARY FUNCTION dboutput AS 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput'
+query: EXPLAIN FROM src
+
+SELECT dboutput ( 'jdbc:derby:../build/test_dboutput_db;create=true','','',
+'CREATE TABLE app_info ( kkey VARCHAR(255) NOT NULL, vvalue VARCHAR(255) NOT NULL, UNIQUE(kkey))' ),
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)','20','a'),
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)','20','b')
+
+limit 1
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION dboutput 'jdbc:derby:../build/test_dboutput_db;create=true' '' '' 'CREATE TABLE app_info ( kkey VARCHAR(255) NOT NULL, vvalue VARCHAR(255) NOT NULL, UNIQUE(kkey))')) (TOK_SELEXPR (TOK_FUNCTION dboutput 'jdbc:derby:../build/test_dboutput_db' '' '' 'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)' '20' 'a')) (TOK_SELEXPR (TOK_FUNCTION dboutput 'jdbc:derby:../build/test_dboutput_db' '' '' 'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)' '20' 'b'))) (TOK_LIMIT 1)))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        src 
+          TableScan
+            alias: src
+            Select Operator
+              expressions:
+                    expr: dboutput('jdbc:derby:../build/test_dboutput_db;create=true','','','CREATE TABLE app_info ( kkey VARCHAR(255) NOT NULL, vvalue VARCHAR(255) NOT NULL, UNIQUE(kkey))')
+                    type: int
+                    expr: dboutput('jdbc:derby:../build/test_dboutput_db','','','INSERT INTO app_info (kkey,vvalue) VALUES (?,?)','20','a')
+                    type: int
+                    expr: dboutput('jdbc:derby:../build/test_dboutput_db','','','INSERT INTO app_info (kkey,vvalue) VALUES (?,?)','20','b')
+                    type: int
+              outputColumnNames: _col0, _col1, _col2
+              Limit
+                File Output Operator
+                  compressed: false
+                  GlobalTableId: 0
+                  table:
+                      input format: org.apache.hadoop.mapred.TextInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: 1
+
+
+query: FROM src 
+
+SELECT dboutput ( 'jdbc:derby:../build/test_dboutput_db;create=true','','',
+'CREATE TABLE app_info ( kkey INTEGER NOT NULL, vvalue VARCHAR(255) NOT NULL, UNIQUE(kkey))' ),
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)','20','a'),
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)','20','b')
+
+limit 1
+Input: default/src
+Output: file:/home/ecapriolo/hive-20/hive-trunk/build/ql/tmp/546437302/10000
+0	0	1
+query: EXPLAIN SELECT
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)',key,value)
+
+FROM src WHERE key < 10
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION dboutput 'jdbc:derby:../build/test_dboutput_db' '' '' 'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)' (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value)))) (TOK_WHERE (< (TOK_TABLE_OR_COL key) 10))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        src 
+          TableScan
+            alias: src
+            Filter Operator
+              predicate:
+                  expr: (key < 10)
+                  type: boolean
+              Filter Operator
+                predicate:
+                    expr: (key < 10)
+                    type: boolean
+                Select Operator
+                  expressions:
+                        expr: dboutput('jdbc:derby:../build/test_dboutput_db','','','INSERT INTO app_info (kkey,vvalue) VALUES (?,?)',key,value)
+                        type: int
+                  outputColumnNames: _col0
+                  File Output Operator
+                    compressed: false
+                    GlobalTableId: 0
+                    table:
+                        input format: org.apache.hadoop.mapred.TextInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+
+
+query: SELECT
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)',key,value)
+
+FROM src WHERE key < 10
+Input: default/src
+Output: file:/home/ecapriolo/hive-20/hive-trunk/build/ql/tmp/1720943059/10000
+0
+0
+0
+1
+1
+0
+1
+0
+1
+0
+Deleted file:/home/ecapriolo/hive-20/hive-trunk/build/test_dboutput_db
+Deleted file:/home/ecapriolo/hive-20/hive-trunk/contrib/derby.log
+query: DROP TEMPORARY FUNCTION dboutput