You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2009/09/01 01:03:33 UTC
svn commit: r809778 - in /hadoop/hive/trunk: ./
contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/
contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/
contrib/src/test/queries/clientpositive/
contrib/src/test/results/clientposi...
Author: namit
Date: Mon Aug 31 23:03:33 2009
New Revision: 809778
URL: http://svn.apache.org/viewvc?rev=809778&view=rev
Log:
HIVE-645. UDF to run JDBC
(Edward Capriolo via namit)
Added:
hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/
hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/
hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFDBOutput.java
hadoop/hive/trunk/contrib/src/test/queries/clientpositive/dboutput.q
hadoop/hive/trunk/contrib/src/test/results/clientpositive/dboutput.q.out
Modified:
hadoop/hive/trunk/CHANGES.txt
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=809778&r1=809777&r2=809778&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Mon Aug 31 23:03:33 2009
@@ -30,6 +30,9 @@
HIVE-792. Support "add archive" in addition to "add file" and "add jar".
(He Yongqiang via zshao)
+ HIVE-645. UDF to run JDBC
+ (Edward Capriolo via namit)
+
IMPROVEMENTS
HIVE-760. Add version info to META-INF/MANIFEST.MF.
Added: hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFDBOutput.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFDBOutput.java?rev=809778&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFDBOutput.java (added)
+++ hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFDBOutput.java Mon Aug 31 23:03:33 2009
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.contrib.genericudf.example;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.hadoop.hive.ql.udf.generic.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.io.IntWritable;
+
+/**
+* GenericUDFDBOutput is designed to output data directly from Hive to a JDBC datastore.
+* This UDF is useful for exporting small to medium summaries that have a unique key.
+*
+* Due to the nature of hadoop, individual mappers, reducers or entire jobs can fail.
+* If a failure occurs a mapper or reducer may be retried. This UDF has no way of
+* detecting failures or rolling back a transaction. Consequently, you should only
+* only use this to export to a table with a unique key. The unique key should safeguard
+* against duplicate data.
+*
+* Use hive's ADD JAR feature to add your JDBC Driver to the distributed cache,
+* otherwise GenericUDFDBoutput will fail.
+*/
+public class GenericUDFDBOutput extends GenericUDF {
+ private static Log LOG = LogFactory.getLog(GenericUDFDBOutput.class.getName());
+
+ ObjectInspector[] argumentOI;
+ GenericUDFUtils.ReturnObjectInspectorResolver returnOIResolver;
+ Connection connection = null;
+ private String url;
+ private String user;
+ private String pass;
+ private IntWritable result = new IntWritable(-1);
+ /**
+ * @param arguments
+ * argument 0 is the JDBC connection string
+ * argument 1 is the user name
+ * argument 2 is the password
+ * argument 3 is an SQL query to be used in the PreparedStatement
+ * argument (4-n) The remaining arguments must be primitive and are passed to the PreparedStatement object
+ */
+ public ObjectInspector initialize(ObjectInspector[] arguments)
+ throws UDFArgumentTypeException {
+ this.argumentOI = arguments;
+
+ //this should be connection url,username,password,query,column1[,columnn]*
+ for (int i=0;i<4;i++){
+ if ( arguments[i].getCategory() == ObjectInspector.Category.PRIMITIVE) {
+ PrimitiveObjectInspector poi = ((PrimitiveObjectInspector)arguments[i]);
+
+ if (! (poi.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING)){
+ throw new UDFArgumentTypeException(i,
+ "The argument of function should be \"" + Constants.STRING_TYPE_NAME
+ + "\", but \"" + arguments[i].getTypeName() + "\" is found");
+ }
+ }
+ }
+ for (int i=4;i<arguments.length;i++){
+ if ( arguments[i].getCategory() != ObjectInspector.Category.PRIMITIVE) {
+ throw new UDFArgumentTypeException(i,
+ "The argument of function should be primative" +
+ ", but \"" + arguments[i].getTypeName() + "\" is found");
+ }
+ }
+
+ return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
+ }
+
+ /**
+ * @return 0 on success -1 on failure
+ */
+ public Object evaluate(DeferredObject[] arguments) throws HiveException {
+
+ url = ((StringObjectInspector)argumentOI[0]).getPrimitiveJavaObject(arguments[0].get());
+ user = ((StringObjectInspector)argumentOI[1]).getPrimitiveJavaObject(arguments[1].get()) ;
+ pass = ((StringObjectInspector)argumentOI[2]).getPrimitiveJavaObject(arguments[2].get()) ;
+
+ try {
+ connection = DriverManager.getConnection(url, user, pass);
+ } catch (SQLException ex) {
+ LOG.error("Driver loading or connection issue", ex);
+ result.set(2);
+ }
+
+ if (connection != null){
+ try {
+
+ PreparedStatement ps = connection.prepareStatement(
+ ((StringObjectInspector)argumentOI[3]).getPrimitiveJavaObject(arguments[3].get()) );
+ for (int i = 4; i < arguments.length; ++i) {
+ PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) argumentOI[i]);
+ ps.setObject(i - 3, poi.getPrimitiveJavaObject(arguments[i].get()));
+ }
+ ps.execute();
+ ps.close();
+ result.set(0);
+ } catch (SQLException e) {
+ LOG.error("Underlying SQL exception", e);
+ result.set(1);
+ } finally {
+ try {
+ connection.close();
+ } catch (Exception ex) {
+ LOG.error("Underlying SQL exception during close", ex);
+ }
+ }
+ }
+
+ return result;
+ }
+
+
+ public String getDisplayString(String[] children) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("dboutput(");
+ if (children.length > 0) {
+ sb.append(children[0]);
+ for(int i=1; i<children.length; i++) {
+ sb.append(",");
+ sb.append(children[i]);
+ }
+ }
+ sb.append(")");
+ return sb.toString();
+ }
+
+}
+
Added: hadoop/hive/trunk/contrib/src/test/queries/clientpositive/dboutput.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/test/queries/clientpositive/dboutput.q?rev=809778&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/test/queries/clientpositive/dboutput.q (added)
+++ hadoop/hive/trunk/contrib/src/test/queries/clientpositive/dboutput.q Mon Aug 31 23:03:33 2009
@@ -0,0 +1,57 @@
+add jar ../build/contrib/hive_contrib.jar;
+
+CREATE TEMPORARY FUNCTION dboutput AS 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput';
+
+set mapred.map.tasks.speculative.execution=false;
+set mapred.reduce.tasks.speculative.execution=false;
+set mapred.map.tasks=1;
+set mapred.reduce.tasks=1;
+
+ADD JAR ../lib/derby.jar ;
+
+EXPLAIN FROM src
+
+SELECT dboutput ( 'jdbc:derby:../build/test_dboutput_db\;create=true','','',
+'CREATE TABLE app_info ( kkey VARCHAR(255) NOT NULL, vvalue VARCHAR(255) NOT NULL, UNIQUE(kkey))' ),
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)','20','a'),
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)','20','b')
+
+limit 1;
+
+
+FROM src
+
+SELECT dboutput ( 'jdbc:derby:../build/test_dboutput_db\;create=true','','',
+'CREATE TABLE app_info ( kkey INTEGER NOT NULL, vvalue VARCHAR(255) NOT NULL, UNIQUE(kkey))' ),
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)','20','a'),
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)','20','b')
+
+limit 1;
+
+EXPLAIN SELECT
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)',key,value)
+
+FROM src WHERE key < 10;
+
+
+SELECT
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)',key,value)
+
+FROM src WHERE key < 10;
+
+dfs -rmr ../build/test_dboutput_db;
+dfs -rmr derby.log;
+
+DROP TEMPORARY FUNCTION dboutput;
Added: hadoop/hive/trunk/contrib/src/test/results/clientpositive/dboutput.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/test/results/clientpositive/dboutput.q.out?rev=809778&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/test/results/clientpositive/dboutput.q.out (added)
+++ hadoop/hive/trunk/contrib/src/test/results/clientpositive/dboutput.q.out Mon Aug 31 23:03:33 2009
@@ -0,0 +1,130 @@
+query: CREATE TEMPORARY FUNCTION dboutput AS 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput'
+query: EXPLAIN FROM src
+
+SELECT dboutput ( 'jdbc:derby:../build/test_dboutput_db;create=true','','',
+'CREATE TABLE app_info ( kkey VARCHAR(255) NOT NULL, vvalue VARCHAR(255) NOT NULL, UNIQUE(kkey))' ),
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)','20','a'),
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)','20','b')
+
+limit 1
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION dboutput 'jdbc:derby:../build/test_dboutput_db;create=true' '' '' 'CREATE TABLE app_info ( kkey VARCHAR(255) NOT NULL, vvalue VARCHAR(255) NOT NULL, UNIQUE(kkey))')) (TOK_SELEXPR (TOK_FUNCTION dboutput 'jdbc:derby:../build/test_dboutput_db' '' '' 'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)' '20' 'a')) (TOK_SELEXPR (TOK_FUNCTION dboutput 'jdbc:derby:../build/test_dboutput_db' '' '' 'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)' '20' 'b'))) (TOK_LIMIT 1)))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ src
+ TableScan
+ alias: src
+ Select Operator
+ expressions:
+ expr: dboutput('jdbc:derby:../build/test_dboutput_db;create=true','','','CREATE TABLE app_info ( kkey VARCHAR(255) NOT NULL, vvalue VARCHAR(255) NOT NULL, UNIQUE(kkey))')
+ type: int
+ expr: dboutput('jdbc:derby:../build/test_dboutput_db','','','INSERT INTO app_info (kkey,vvalue) VALUES (?,?)','20','a')
+ type: int
+ expr: dboutput('jdbc:derby:../build/test_dboutput_db','','','INSERT INTO app_info (kkey,vvalue) VALUES (?,?)','20','b')
+ type: int
+ outputColumnNames: _col0, _col1, _col2
+ Limit
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: 1
+
+
+query: FROM src
+
+SELECT dboutput ( 'jdbc:derby:../build/test_dboutput_db;create=true','','',
+'CREATE TABLE app_info ( kkey INTEGER NOT NULL, vvalue VARCHAR(255) NOT NULL, UNIQUE(kkey))' ),
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)','20','a'),
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)','20','b')
+
+limit 1
+Input: default/src
+Output: file:/home/ecapriolo/hive-20/hive-trunk/build/ql/tmp/546437302/10000
+0 0 1
+query: EXPLAIN SELECT
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)',key,value)
+
+FROM src WHERE key < 10
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION dboutput 'jdbc:derby:../build/test_dboutput_db' '' '' 'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)' (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value)))) (TOK_WHERE (< (TOK_TABLE_OR_COL key) 10))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ src
+ TableScan
+ alias: src
+ Filter Operator
+ predicate:
+ expr: (key < 10)
+ type: boolean
+ Filter Operator
+ predicate:
+ expr: (key < 10)
+ type: boolean
+ Select Operator
+ expressions:
+ expr: dboutput('jdbc:derby:../build/test_dboutput_db','','','INSERT INTO app_info (kkey,vvalue) VALUES (?,?)',key,value)
+ type: int
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+query: SELECT
+
+dboutput('jdbc:derby:../build/test_dboutput_db','','',
+'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)',key,value)
+
+FROM src WHERE key < 10
+Input: default/src
+Output: file:/home/ecapriolo/hive-20/hive-trunk/build/ql/tmp/1720943059/10000
+0
+0
+0
+1
+1
+0
+1
+0
+1
+0
+Deleted file:/home/ecapriolo/hive-20/hive-trunk/build/test_dboutput_db
+Deleted file:/home/ecapriolo/hive-20/hive-trunk/contrib/derby.log
+query: DROP TEMPORARY FUNCTION dboutput