You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:18 UTC

[17/51] [partial] Initial commit

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/parse/TraverseNoParseNodeVisitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/parse/TraverseNoParseNodeVisitor.java b/src/main/java/org/apache/phoenix/parse/TraverseNoParseNodeVisitor.java
new file mode 100644
index 0000000..899df6f
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/parse/TraverseNoParseNodeVisitor.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+import java.util.List;
+
+
+/**
+ * 
+ * Visitor that traverses into no parse nodes
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class TraverseNoParseNodeVisitor<T> extends BaseParseNodeVisitor<T> {
+    @Override
+    public boolean visitEnter(AndParseNode node) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean visitEnter(OrParseNode node) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean visitEnter(FunctionParseNode node) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean visitEnter(ComparisonParseNode node) throws SQLException {
+        return false;
+    }
+    
+    @Override
+    public boolean visitEnter(CaseParseNode node) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean visitEnter(LikeParseNode node) throws SQLException {
+        return false;
+    }
+    
+    @Override
+    public boolean visitEnter(BetweenParseNode node) throws SQLException {
+        return false;
+    }
+    
+    @Override
+    public T visitLeave(LikeParseNode node, List<T> l) throws SQLException {
+        return null;
+    }
+    
+    @Override
+    public boolean visitEnter(NotParseNode node) throws SQLException {
+        return false;
+    }
+    
+    @Override
+    public boolean visitEnter(CastParseNode node) throws SQLException {
+        return false;
+    }
+    
+    @Override
+    public T visitLeave(NotParseNode node, List<T> l) throws SQLException {
+        return null;
+    }
+    
+    @Override
+    public T visitLeave(CastParseNode node, List<T> l) throws SQLException {
+        return null;
+    }
+    
+    @Override
+    public boolean visitEnter(InListParseNode node) throws SQLException {
+        return false;
+    }
+    
+    @Override
+    public T visitLeave(InListParseNode node, List<T> l) throws SQLException {
+        return null;
+    }
+    
+    @Override
+    public boolean visitEnter(IsNullParseNode node) throws SQLException {
+        return false;
+    }
+    
+    @Override
+    public T visitLeave(IsNullParseNode node, List<T> l) throws SQLException {
+        return null;
+    }
+    
+    @Override
+    public T visit(ColumnParseNode node) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public T visit(LiteralParseNode node) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public T visit(BindParseNode node) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public T visit(WildcardParseNode node) throws SQLException {
+        return null;
+    }
+    
+    @Override
+    public T visit(FamilyWildcardParseNode node) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public T visitLeave(AndParseNode node, List<T> l) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public T visitLeave(OrParseNode node, List<T> l) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public T visitLeave(FunctionParseNode node, List<T> l) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public T visitLeave(ComparisonParseNode node, List<T> l) throws SQLException {
+        return null;
+    }
+    
+    @Override
+    public T visitLeave(CaseParseNode node, List<T> l) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean visitEnter(MultiplyParseNode node) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public T visitLeave(MultiplyParseNode node, List<T> l) throws SQLException {
+        return null;
+    }
+    
+    @Override
+    public boolean visitEnter(SubtractParseNode node) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public T visitLeave(SubtractParseNode node, List<T> l) throws SQLException {
+        return null;
+    }
+    
+    @Override
+    public boolean visitEnter(AddParseNode node) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public T visitLeave(AddParseNode node, List<T> l) throws SQLException {
+        return null;
+    }
+    
+    @Override
+    public boolean visitEnter(DivideParseNode node) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public T visitLeave(DivideParseNode node, List<T> l) throws SQLException {
+        return null;
+    }
+    @Override
+    public boolean visitEnter(StringConcatParseNode node) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public T visitLeave(StringConcatParseNode node, List<T> l) throws SQLException {
+        return null;
+    }
+    
+    @Override
+    public T visitLeave(BetweenParseNode node, List<T> l) throws SQLException {
+        return null;
+    }
+    
+    @Override
+    public boolean visitEnter(RowValueConstructorParseNode node) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public T visitLeave(RowValueConstructorParseNode node, List<T> l) throws SQLException {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/parse/UnaryParseNode.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/parse/UnaryParseNode.java b/src/main/java/org/apache/phoenix/parse/UnaryParseNode.java
new file mode 100644
index 0000000..356d047
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/parse/UnaryParseNode.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.util.Collections;
+
+/**
+ * 
+ * Abstract node representing an expression that has a single child in SQL
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class UnaryParseNode extends CompoundParseNode {
+    UnaryParseNode(ParseNode expr) {
+        super(Collections.singletonList(expr));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/parse/UnsupportedAllParseNodeVisitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/parse/UnsupportedAllParseNodeVisitor.java b/src/main/java/org/apache/phoenix/parse/UnsupportedAllParseNodeVisitor.java
new file mode 100644
index 0000000..60f7edf
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/parse/UnsupportedAllParseNodeVisitor.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.List;
+
+
+/**
+ * 
+ * Visitor that throws UnsupportedOperationException for every
+ * node.  Meant to be sub-classed for the case of a small subset
+ * of nodes being supported, in which case only those applicable
+ * methods would be overridden.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+abstract public class UnsupportedAllParseNodeVisitor<E> extends BaseParseNodeVisitor<E> {
+
+    @Override
+    public E visit(ColumnParseNode node) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public E visit(LiteralParseNode node) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public E visit(BindParseNode node) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public E visit(WildcardParseNode node) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public E visit(FamilyWildcardParseNode node) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public boolean visitEnter(AndParseNode node) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public boolean visitEnter(OrParseNode node) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public boolean visitEnter(FunctionParseNode node) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public boolean visitEnter(ComparisonParseNode node) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+    
+    @Override
+    public boolean visitEnter(BetweenParseNode node) throws SQLException{
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+    
+    @Override
+    public E visitLeave(AndParseNode node, List<E> l) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+    
+    @Override
+    public E visitLeave(OrParseNode node, List<E> l) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public E visitLeave(FunctionParseNode node, List<E> l) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public E visitLeave(ComparisonParseNode node, List<E> l) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public boolean visitEnter(LikeParseNode node) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public E visitLeave(LikeParseNode node, List<E> l) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public E visitLeave(NotParseNode node, List<E> l) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+    
+    @Override
+    public boolean visitEnter(NotParseNode node) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+    
+    @Override
+    public E visitLeave(CastParseNode node, List<E> l) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+    
+    @Override
+    public boolean visitEnter(CastParseNode node) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+    
+    @Override
+    public E visitLeave(InListParseNode node, List<E> l) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public E visitLeave(BetweenParseNode node, List<E> l) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+    
+    @Override
+    public boolean visitEnter(InListParseNode node) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public boolean visitEnter(IsNullParseNode node) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public E visitLeave(IsNullParseNode node, List<E> l) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public boolean visitEnter(AddParseNode node) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public E visitLeave(AddParseNode node, List<E> l) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public boolean visitEnter(SubtractParseNode node) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public E visitLeave(SubtractParseNode node, List<E> l) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public boolean visitEnter(MultiplyParseNode node) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public E visitLeave(MultiplyParseNode node, List<E> l) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public boolean visitEnter(DivideParseNode node) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public E visitLeave(DivideParseNode node, List<E> l) throws SQLException {
+        throw new SQLFeatureNotSupportedException(node.toString());
+    }
+
+    @Override
+    public List<E> newElementList(int size) {
+        return null;
+    }
+
+    @Override
+    public void addElement(List<E> a, E element) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/parse/UpsertStatement.java b/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
new file mode 100644
index 0000000..777a64c
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.util.Collections;
+import java.util.List;
+
+public class UpsertStatement extends SingleTableSQLStatement { 
+    private final List<ColumnName> columns;
+    private final List<ParseNode> values;
+    private final SelectStatement select;
+    private final HintNode hint;
+
+    public UpsertStatement(NamedTableNode table, HintNode hint, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount) {
+        super(table, bindCount);
+        this.columns = columns == null ? Collections.<ColumnName>emptyList() : columns;
+        this.values = values;
+        this.select = select;
+        this.hint = hint == null ? HintNode.EMPTY_HINT_NODE : hint;
+    }
+
+    public List<ColumnName> getColumns() {
+        return columns;
+    }
+
+    public List<ParseNode> getValues() {
+        return values;
+    }
+
+    public SelectStatement getSelect() {
+        return select;
+    }
+
+    public HintNode getHint() {
+        return hint;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/parse/WildcardParseNode.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/parse/WildcardParseNode.java b/src/main/java/org/apache/phoenix/parse/WildcardParseNode.java
new file mode 100644
index 0000000..a0e3b6e
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/parse/WildcardParseNode.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+
+
+
+/**
+ * 
+ * Node representing the selection of all columns (*) in the SELECT clause of SQL
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class WildcardParseNode extends TerminalParseNode {
+    private final boolean isRewrite;    
+    public static final WildcardParseNode INSTANCE = new WildcardParseNode(false);
+    public static final WildcardParseNode REWRITE_INSTANCE = new WildcardParseNode(true);
+
+    private WildcardParseNode(boolean isRewrite) {
+        this.isRewrite = isRewrite;
+    }
+
+    @Override
+    public <T> T accept(ParseNodeVisitor<T> visitor) throws SQLException {
+        return visitor.visit(this);
+    }
+
+    @Override
+    public String toString() {
+        return "*";
+    }
+
+    public boolean isRewrite() {
+        return isRewrite;
+    }    
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java b/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
new file mode 100644
index 0000000..eff0f87
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+
+import org.apache.phoenix.pig.hadoop.PhoenixOutputFormat;
+import org.apache.phoenix.pig.hadoop.PhoenixRecord;
+
+/**
+ * StoreFunc that uses Phoenix to store data into HBase.
+ * 
+ * Example usage: A = load 'testdata' as (a:chararray, b:chararray, c:chararray,
+ * d:chararray, e: datetime); STORE A into 'hbase://CORE.ENTITY_HISTORY' using
+ * org.apache.bdaas.PhoenixHBaseStorage('localhost','-batchSize 5000');
+ * 
+ * The above reads a file 'testdata' and writes the elements to HBase. First
+ * argument to this StoreFunc is the server, the 2nd argument is the batch size
+ * for upserts via Phoenix.
+ * 
+ * Note that Pig types must be in sync with the target Phoenix data types. This
+ * StoreFunc tries best to cast based on input Pig types and target Phoenix data
+ * types, but it is recommended to supply appropriate schema.
+ * 
+ * This is only a STORE implementation. LoadFunc coming soon.
+ * 
+ * @author pkommireddi
+ * 
+ */
+@SuppressWarnings("rawtypes")
+public class PhoenixHBaseStorage implements StoreFuncInterface {
+
+	private PhoenixPigConfiguration config;
+	private String tableName;
+	private RecordWriter<NullWritable, PhoenixRecord> writer;
+	private String contextSignature = null;
+	private ResourceSchema schema;	
+	private long batchSize;
+	private final PhoenixOutputFormat outputFormat = new PhoenixOutputFormat();
+
+	// Set of options permitted
+	private final static Options validOptions = new Options();
+	private final static CommandLineParser parser = new GnuParser();
+	private final static String SCHEMA = "_schema";
+
+	private final CommandLine configuredOptions;
+	private final String server;
+
+	public PhoenixHBaseStorage(String server) throws ParseException {
+		this(server, null);
+	}
+
+	public PhoenixHBaseStorage(String server, String optString)
+			throws ParseException {
+		populateValidOptions();
+		this.server = server;
+
+		String[] optsArr = optString == null ? new String[0] : optString.split(" ");
+		try {
+			configuredOptions = parser.parse(validOptions, optsArr);
+		} catch (ParseException e) {
+			HelpFormatter formatter = new HelpFormatter();
+			formatter.printHelp("[-batchSize]", validOptions);
+			throw e;
+		}
+
+		batchSize = Long.parseLong(configuredOptions.getOptionValue("batchSize"));
+	}
+
+	private static void populateValidOptions() {
+		validOptions.addOption("batchSize", true, "Specify upsert batch size");
+	}
+
+	/**
+	 * Returns UDFProperties based on <code>contextSignature</code>.
+	 */
+	private Properties getUDFProperties() {
+		return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] { contextSignature });
+	}
+
+	
+	/**
+	 * Parse the HBase table name and configure job
+	 */
+	@Override
+	public void setStoreLocation(String location, Job job) throws IOException {
+		String prefix = "hbase://";
+		if (location.startsWith(prefix)) {
+			tableName = location.substring(prefix.length());
+		}
+		config = new PhoenixPigConfiguration(job.getConfiguration());
+		config.configure(server, tableName, batchSize);
+
+		String serializedSchema = getUDFProperties().getProperty(contextSignature + SCHEMA);
+		if (serializedSchema != null) {
+			schema = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+    @Override
+	public void prepareToWrite(RecordWriter writer) throws IOException {
+		this.writer =writer;
+	}
+
+	@Override
+	public void putNext(Tuple t) throws IOException {
+        ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields();      
+        
+        PhoenixRecord record = new PhoenixRecord(fieldSchemas);
+        
+        for(int i=0; i<t.size(); i++) {
+        	record.add(t.get(i));
+        }
+        
+		try {
+			writer.write(null, record);
+		} catch (InterruptedException e) {
+			throw new RuntimeException(e);
+		}
+        
+	}
+
+	@Override
+	public void setStoreFuncUDFContextSignature(String signature) {
+        this.contextSignature = signature;
+	}
+
+	@Override
+	public void cleanupOnFailure(String location, Job job) throws IOException {
+	}
+
+	@Override
+	public void cleanupOnSuccess(String location, Job job) throws IOException {
+	}
+
+	@Override
+	public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
+		return location;
+	}
+
+	@Override
+	public OutputFormat getOutputFormat() throws IOException {
+		return outputFormat;
+	}
+
+	@Override
+	public void checkSchema(ResourceSchema s) throws IOException {
+		schema = s;
+		getUDFProperties().setProperty(contextSignature + SCHEMA, ObjectSerializer.serialize(schema));
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java b/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
new file mode 100644
index 0000000..f0bf51c
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pig;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.QueryUtil;
+
+/**
+ * A container for configuration to be used with {@link PhoenixHBaseStorage}
+ * 
+ * @author pkommireddi
+ * 
+ */
+public class PhoenixPigConfiguration {
+	
+	private static final Log LOG = LogFactory.getLog(PhoenixPigConfiguration.class);
+	
+	/**
+	 * Speculative execution of Map tasks
+	 */
+	public static final String MAP_SPECULATIVE_EXEC = "mapred.map.tasks.speculative.execution";
+
+	/**
+	 * Speculative execution of Reduce tasks
+	 */
+	public static final String REDUCE_SPECULATIVE_EXEC = "mapred.reduce.tasks.speculative.execution";
+	
+	public static final String SERVER_NAME = "phoenix.hbase.server.name";
+	
+	public static final String TABLE_NAME = "phoenix.hbase.table.name";
+	
+	public static final String UPSERT_STATEMENT = "phoenix.upsert.stmt";
+	
+	public static final String UPSERT_BATCH_SIZE = "phoenix.upsert.batch.size";
+	
+	public static final long DEFAULT_UPSERT_BATCH_SIZE = 1000;
+	
+	private final Configuration conf;
+	
+	private Connection conn;
+	private List<ColumnInfo> columnMetadataList;
+		
+	public PhoenixPigConfiguration(Configuration conf) {
+		this.conf = conf;
+	}
+	
+	public void configure(String server, String tableName, long batchSize) {
+		conf.set(SERVER_NAME, server);
+		conf.set(TABLE_NAME, tableName);
+		conf.setLong(UPSERT_BATCH_SIZE, batchSize);
+		conf.setBoolean(MAP_SPECULATIVE_EXEC, false);
+		conf.setBoolean(REDUCE_SPECULATIVE_EXEC, false);
+	}
+	
+	/**
+	 * Creates a {@link Connection} with autoCommit set to false.
+	 * @throws SQLException
+	 */
+	public Connection getConnection() throws SQLException {
+		Properties props = new Properties();
+		conn = DriverManager.getConnection(QueryUtil.getUrl(this.conf.get(SERVER_NAME)), props).unwrap(PhoenixConnection.class);
+		conn.setAutoCommit(false);
+		
+		setup(conn);
+		
+		return conn;
+	}
+	
+	/**
+	 * This method creates the Upsert statement and the Column Metadata
+	 * for the Pig query using {@link PhoenixHBaseStorage}. It also 
+	 * determines the batch size based on user provided options.
+	 * 
+	 * @param conn
+	 * @throws SQLException
+	 */
+	public void setup(Connection conn) throws SQLException {
+		// Reset batch size
+		long batchSize = getBatchSize() <= 0 ? ((PhoenixConnection) conn).getMutateBatchSize() : getBatchSize();
+		conf.setLong(UPSERT_BATCH_SIZE, batchSize);
+		
+		if (columnMetadataList == null) {
+			columnMetadataList = new ArrayList<ColumnInfo>();
+			String[] tableMetadata = getTableMetadata(getTableName());
+			ResultSet rs = conn.getMetaData().getColumns(null, tableMetadata[0], tableMetadata[1], null);
+			while (rs.next()) {
+				columnMetadataList.add(new ColumnInfo(rs.getString(QueryUtil.COLUMN_NAME_POSITION), rs.getInt(QueryUtil.DATA_TYPE_POSITION)));
+			}
+		}
+		
+		// Generating UPSERT statement without column name information.
+		String upsertStmt = QueryUtil.constructUpsertStatement(null, getTableName(), columnMetadataList.size());
+		LOG.info("Phoenix Upsert Statement: " + upsertStmt);
+		conf.set(UPSERT_STATEMENT, upsertStmt);
+	}
+	
+	public String getUpsertStatement() {
+		return conf.get(UPSERT_STATEMENT);
+	}
+
+	public long getBatchSize() {
+		return conf.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
+	}
+
+
+	public String getServer() {
+		return conf.get(SERVER_NAME);
+	}
+
+	public List<ColumnInfo> getColumnMetadataList() {
+		return columnMetadataList;
+	}
+	
+	public String getTableName() {
+		return conf.get(TABLE_NAME);
+	}
+
+	private String[] getTableMetadata(String table) {
+		String[] schemaAndTable = table.split("\\.");
+		assert schemaAndTable.length >= 1;
+
+		if (schemaAndTable.length == 1) {
+			return new String[] { "", schemaAndTable[0] };
+		}
+
+		return new String[] { schemaAndTable[0], schemaAndTable[1] };
+	}
+
+	
+	public Configuration getConfiguration() {
+		return this.conf;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/pig/TypeUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/pig/TypeUtil.java b/src/main/java/org/apache/phoenix/pig/TypeUtil.java
new file mode 100644
index 0000000..4bb2637
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/pig/TypeUtil.java
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pig;
+
+import java.io.IOException;
+import java.sql.*;
+
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.joda.time.DateTime;
+
+import org.apache.phoenix.schema.PDataType;
+
+public class TypeUtil {
+	
+	private static final Utf8StorageConverter utf8Converter = new Utf8StorageConverter();
+	
+	/**
+	 * This method returns the most appropriate PDataType associated with 
+	 * the incoming Pig type. Note for Pig DataType DATETIME, returns DATE as 
+	 * inferredSqlType. 
+	 * 
+	 * This is later used to make a cast to targetPhoenixType accordingly. See
+	 * {@link #castPigTypeToPhoenix(Object, byte, PDataType)}
+	 * 
+	 * @param obj
+	 * @return PDataType
+	 */
+	public static PDataType getType(Object obj, byte type) {
+		if (obj == null) {
+			return null;
+		}
+	
+		PDataType sqlType;
+
+		switch (type) {
+		case DataType.BYTEARRAY:
+			sqlType = PDataType.VARBINARY;
+			break;
+		case DataType.CHARARRAY:
+			sqlType = PDataType.VARCHAR;
+			break;
+		case DataType.DOUBLE:
+			sqlType = PDataType.DOUBLE;
+			break;
+		case DataType.FLOAT:
+			sqlType = PDataType.FLOAT;
+			break;
+		case DataType.INTEGER:
+			sqlType = PDataType.INTEGER;
+			break;
+		case DataType.LONG:
+			sqlType = PDataType.LONG;
+			break;
+		case DataType.BOOLEAN:
+			sqlType = PDataType.BOOLEAN;
+			break;
+		case DataType.DATETIME:
+			sqlType = PDataType.DATE;
+			break;
+		default:
+			throw new RuntimeException("Unknown type " + obj.getClass().getName()
+					+ " passed to PhoenixHBaseStorage");
+		}
+
+		return sqlType;
+
+	}
+
+	/**
+	 * This method encodes a value with Phoenix data type. It begins
+	 * with checking whether an object is BINARY and makes a call to
+	 * {@link #castBytes(Object, PDataType)} to convery bytes to
+	 * targetPhoenixType
+	 * 
+	 * @param o
+	 * @param targetPhoenixType
+	 * @return Object
+	 */
+	public static Object castPigTypeToPhoenix(Object o, byte objectType, PDataType targetPhoenixType) {
+		PDataType inferredPType = getType(o, objectType);
+		
+		if(inferredPType == null) {
+			return null;
+		}
+		
+		if(inferredPType == PDataType.VARBINARY && targetPhoenixType != PDataType.VARBINARY) {
+			try {
+				o = castBytes(o, targetPhoenixType);
+				inferredPType = getType(o, DataType.findType(o));
+			} catch (IOException e) {
+				throw new RuntimeException("Error while casting bytes for object " +o);
+			}
+		}
+
+		if(inferredPType == PDataType.DATE) {
+			int inferredSqlType = targetPhoenixType.getSqlType();
+
+			if(inferredSqlType == Types.DATE) {
+				return new Date(((DateTime)o).getMillis());
+			} 
+			if(inferredSqlType == Types.TIME) {
+				return new Time(((DateTime)o).getMillis());
+			}
+			if(inferredSqlType == Types.TIMESTAMP) {
+				return new Timestamp(((DateTime)o).getMillis());
+			}
+		}
+		
+		if (targetPhoenixType == inferredPType || inferredPType.isCoercibleTo(targetPhoenixType)) {
+			return inferredPType.toObject(o, targetPhoenixType);
+		}
+		
+		throw new RuntimeException(o.getClass().getName()
+				+ " cannot be coerced to "+targetPhoenixType.toString());
+	}
+	
+	/**
+	 * This method converts bytes to the target type required
+	 * for Phoenix. It uses {@link Utf8StorageConverter} for
+	 * the conversion.
+	 * 
+	 * @param o
+	 * @param targetPhoenixType
+	 * @return Object
+	 * @throws IOException
+	 */
+    public static Object castBytes(Object o, PDataType targetPhoenixType) throws IOException {
+        byte[] bytes = ((DataByteArray)o).get();
+        
+        switch(targetPhoenixType) {
+        case CHAR:
+        case VARCHAR:
+            return utf8Converter.bytesToCharArray(bytes);
+        case UNSIGNED_INT:
+        case INTEGER:
+            return utf8Converter.bytesToInteger(bytes);
+        case BOOLEAN:
+            return utf8Converter.bytesToBoolean(bytes);
+//        case DECIMAL: not in Pig v 0.11.0, so using double for now
+//            return utf8Converter.bytesToBigDecimal(bytes);
+        case DECIMAL:
+            return utf8Converter.bytesToDouble(bytes);
+        case UNSIGNED_LONG:
+        case LONG:
+            return utf8Converter.bytesToLong(bytes);
+        case TIME:
+        case TIMESTAMP:
+        case DATE:
+        	return utf8Converter.bytesToDateTime(bytes);
+        default:
+        	return o;
+        }        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java b/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java
new file mode 100644
index 0000000..503f570
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pig.hadoop;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import org.apache.phoenix.jdbc.PhoenixStatement;
+
+/**
+ * 
+ * {@link OutputCommitter} implementation for Pig tasks using Phoenix
+ * connections to upsert to HBase
+ * 
+ * @author pkommireddi
+ *
+ */
+public class PhoenixOutputCommitter extends OutputCommitter {
+	private final Log LOG = LogFactory.getLog(PhoenixOutputCommitter.class);
+	
+	private final PhoenixOutputFormat outputFormat;
+	
+	public PhoenixOutputCommitter(PhoenixOutputFormat outputFormat) {
+		if(outputFormat == null) {
+			throw new IllegalArgumentException("PhoenixOutputFormat must not be null.");
+		}
+		this.outputFormat = outputFormat;
+	}
+
+	/**
+	 *  TODO implement rollback functionality. 
+	 *  
+	 *  {@link PhoenixStatement#execute(String)} is buffered on the client, this makes 
+	 *  it difficult to implement rollback as once a commit is issued it's hard to go 
+	 *  back all the way to undo. 
+	 */
+	@Override
+	public void abortTask(TaskAttemptContext context) throws IOException {
+	}
+
+	@Override
+	public void commitTask(TaskAttemptContext context) throws IOException {
+		commit(outputFormat.getConnection(context.getConfiguration()));
+	}
+
+	@Override
+	public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+		return true;
+	}
+
+	@Override
+	public void setupJob(JobContext jobContext) throws IOException {		
+	}
+
+	@Override
+	public void setupTask(TaskAttemptContext context) throws IOException {
+	}
+
+	/**
+	 * Commit a transaction on task completion
+	 * 
+	 * @param connection
+	 * @throws IOException
+	 */
+	private void commit(Connection connection) throws IOException {
+		try {
+			if (connection == null || connection.isClosed()) {
+				throw new IOException("Trying to commit a connection that is null or closed: "+ connection);
+			}
+		} catch (SQLException e) {
+			throw new IOException("Exception calling isClosed on connection", e);
+		}
+
+		try {
+			LOG.debug("Commit called on task completion");
+			connection.commit();
+		} catch (SQLException e) {
+			throw new IOException("Exception while trying to commit a connection. ", e);
+		} finally {
+			try {
+				LOG.debug("Closing connection to database on task completion");
+				connection.close();
+			} catch (SQLException e) {
+				LOG.warn("Exception while trying to close database connection", e);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java b/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java
new file mode 100644
index 0000000..814dc33
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pig.hadoop;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import org.apache.phoenix.pig.PhoenixPigConfiguration;
+
+/**
+ * {@link OutputFormat} implementation for Phoenix
+ * 
+ * @author pkommireddi
+ *
+ */
+public class PhoenixOutputFormat extends OutputFormat<NullWritable, PhoenixRecord> {
+	private static final Log LOG = LogFactory.getLog(PhoenixOutputFormat.class);
+	
+	private Connection connection;
+	private PhoenixPigConfiguration config;
+
+	@Override
+	public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {		
+	}
+
+	/**
+	 * TODO Implement {@link OutputCommitter} to rollback in case of task failure
+	 */
+	
+	@Override
+	public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+		return new PhoenixOutputCommitter(this);
+	}
+
+	@Override
+	public RecordWriter<NullWritable, PhoenixRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+		try {
+			return new PhoenixRecordWriter(getConnection(context.getConfiguration()), config);
+		} catch (SQLException e) {
+			throw new IOException(e);
+		}
+	}
+	
+	/**
+	 * This method creates a database connection. A single instance is created
+	 * and passed around for re-use.
+	 * 
+	 * @param configuration
+	 * @return
+	 * @throws IOException
+	 */
+	synchronized Connection getConnection(Configuration configuration) throws IOException {
+	    if (connection != null) { 
+	    	return connection; 
+	    }
+	    
+	    config = new PhoenixPigConfiguration(configuration);	    
+		try {
+			LOG.info("Initializing new Phoenix connection...");
+			connection = config.getConnection();
+			LOG.info("Initialized Phoenix connection, autoCommit="+ connection.getAutoCommit());
+			return connection;
+		} catch (SQLException e) {
+			throw new IOException(e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java b/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
new file mode 100644
index 0000000..2973c08
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pig.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataType;
+
+import org.apache.phoenix.pig.TypeUtil;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ColumnInfo;
+
+/**
+ * A {@link Writable} representing a Phoenix record. This class
+ * does a type mapping and sets the value accordingly in the 
+ * {@link PreparedStatement}
+ * 
+ * @author pkommireddi
+ *
+ */
+public class PhoenixRecord implements Writable {
+	
+	private final List<Object> values;
+	private final ResourceFieldSchema[] fieldSchemas;
+	
+	public PhoenixRecord(ResourceFieldSchema[] fieldSchemas) {
+		this.values = new ArrayList<Object>();
+		this.fieldSchemas = fieldSchemas;
+	}
+	
+	@Override
+	public void readFields(DataInput in) throws IOException {		
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException {		
+	}
+	
+	public void write(PreparedStatement statement, List<ColumnInfo> columnMetadataList) throws SQLException {
+		for (int i = 0; i < columnMetadataList.size(); i++) {
+			Object o = values.get(i);
+			
+			byte type = (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
+			Object upsertValue = convertTypeSpecificValue(o, type, columnMetadataList.get(i).getSqlType());
+
+			if (upsertValue != null) {
+				statement.setObject(i + 1, upsertValue, columnMetadataList.get(i).getSqlType());
+			} else {
+				statement.setNull(i + 1, columnMetadataList.get(i).getSqlType());
+			}
+		}
+		
+		statement.execute();
+	}
+	
+	public void add(Object value) {
+		values.add(value);
+	}
+
+	private Object convertTypeSpecificValue(Object o, byte type, Integer sqlType) {
+		PDataType pDataType = PDataType.fromSqlType(sqlType);
+
+		return TypeUtil.castPigTypeToPhoenix(o, type, pDataType);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java b/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
new file mode 100644
index 0000000..c92d374
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pig.hadoop;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import org.apache.phoenix.pig.PhoenixPigConfiguration;
+
+/**
+ * 
+ * {@link RecordWriter} implementation for Phoenix
+ * 
+ * @author pkommireddi
+ *
+ */
+public class PhoenixRecordWriter extends RecordWriter<NullWritable, PhoenixRecord> {
+	
+	private static final Log LOG = LogFactory.getLog(PhoenixRecordWriter.class);
+	
+	private long numRecords = 0;
+	
+	private final Connection conn;
+	private final PreparedStatement statement;
+	private final PhoenixPigConfiguration config;
+	private final long batchSize;
+	
+	public PhoenixRecordWriter(Connection conn, PhoenixPigConfiguration config) throws SQLException {
+		this.conn = conn;
+		this.config = config;
+		this.batchSize = config.getBatchSize();
+		this.statement = this.conn.prepareStatement(config.getUpsertStatement());
+	}
+
+
+	/**
+	 * Committing and closing the connection is handled by {@link PhoenixOutputCommitter}.
+	 * 
+	 */
+	@Override
+	public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+	}
+
+	@Override
+	public void write(NullWritable n, PhoenixRecord record) throws IOException, InterruptedException {		
+		try {
+			record.write(statement, config.getColumnMetadataList());
+			numRecords++;
+
+			if (numRecords % batchSize == 0) {
+				LOG.debug("commit called on a batch of size : " + batchSize);
+				conn.commit();
+			}
+		} catch (SQLException e) {
+			throw new IOException("Exception while committing to database.", e);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java b/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
new file mode 100644
index 0000000..27fa10e
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.query;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.phoenix.job.JobManager;
+import org.apache.phoenix.memory.GlobalMemoryManager;
+import org.apache.phoenix.memory.MemoryManager;
+import org.apache.phoenix.optimize.QueryOptimizer;
+import org.apache.phoenix.util.ReadOnlyProps;
+
+
+
+/**
+ * 
+ * Base class for QueryService implementors.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class BaseQueryServicesImpl implements QueryServices {
+    private final ExecutorService executor;
+    private final MemoryManager memoryManager;
+    private final ReadOnlyProps props;
+    private final QueryOptimizer queryOptimizer;
+    
+    public BaseQueryServicesImpl(QueryServicesOptions options) {
+        this.executor =  JobManager.createThreadPoolExec(
+                options.getKeepAliveMs(), 
+                options.getThreadPoolSize(), 
+                options.getQueueSize());
+        this.memoryManager = new GlobalMemoryManager(
+                Runtime.getRuntime().totalMemory() * options.getMaxMemoryPerc() / 100,
+                options.getMaxMemoryWaitMs());
+        this.props = options.getProps();
+        this.queryOptimizer = new QueryOptimizer(this);
+    }
+    
+    @Override
+    public ExecutorService getExecutor() {
+        return executor;
+    }
+
+    @Override
+    public MemoryManager getMemoryManager() {
+        return memoryManager;
+    }
+
+    @Override
+    public final ReadOnlyProps getProps() {
+        return props;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public QueryOptimizer getOptimizer() {
+        return queryOptimizer;
+    }   
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/query/ChildQueryServices.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/query/ChildQueryServices.java b/src/main/java/org/apache/phoenix/query/ChildQueryServices.java
new file mode 100644
index 0000000..93613d6
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/query/ChildQueryServices.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.query;
+
+import org.apache.phoenix.memory.ChildMemoryManager;
+import org.apache.phoenix.memory.MemoryManager;
+
+/**
+ * 
+ * Child QueryServices that delegates through to global QueryService.
+ * Used to track memory used by each org to allow a max percentage threshold.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ChildQueryServices extends DelegateConnectionQueryServices {
+    private final MemoryManager memoryManager;
+    private static final int DEFAULT_MAX_ORG_MEMORY_PERC = 30;
+    
+    public ChildQueryServices(ConnectionQueryServices services) {
+        super(services);
+        int maxOrgMemPerc = getProps().getInt(MAX_TENANT_MEMORY_PERC_ATTRIB, DEFAULT_MAX_ORG_MEMORY_PERC);
+        this.memoryManager = new ChildMemoryManager(services.getMemoryManager(), maxOrgMemPerc);
+    }
+
+    @Override
+    public MemoryManager getMemoryManager() {
+        return memoryManager;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/query/ConfigurationFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/query/ConfigurationFactory.java b/src/main/java/org/apache/phoenix/query/ConfigurationFactory.java
new file mode 100644
index 0000000..77d737e
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/query/ConfigurationFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.query;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+
+/**
+ * Creates {@link Configuration} instances that contain HBase/Hadoop settings.
+ *
+ * @author aaraujo
+ * @since 2.0
+ */
+public interface ConfigurationFactory {
+    /**
+     * @return Configuration containing HBase/Hadoop settings
+     */
+    Configuration getConfiguration();
+
+    /**
+     * Default implementation uses {@link org.apache.hadoop.hbase.HBaseConfiguration#create()}.
+     */
+    static class ConfigurationFactoryImpl implements ConfigurationFactory {
+        @Override
+        public Configuration getConfiguration() {
+            return HBaseConfiguration.create();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
new file mode 100644
index 0000000..9f304c5
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.query;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+
+import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PTableType;
+
+
+public interface ConnectionQueryServices extends QueryServices, MetaDataMutated {
+    /**
+     * Get (and create if necessary) a child QueryService for a given tenantId.
+     * The QueryService will be cached for the lifetime of the parent QueryService
+     * @param tenantId the organization ID
+     * @return the child QueryService
+     */
+    public ConnectionQueryServices getChildQueryServices(ImmutableBytesWritable tenantId);
+
+    /**
+     * Get an HTableInterface by the given name. It is the callers
+     * responsibility to close the returned HTableInterface.
+     * @param tableName the name of the HTable
+     * @return the HTableInterface
+     * @throws SQLException 
+     */
+    public HTableInterface getTable(byte[] tableName) throws SQLException;
+
+    public HTableDescriptor getTableDescriptor(byte[] tableName) throws SQLException;
+
+    public StatsManager getStatsManager();
+
+    public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException;
+
+    public PhoenixConnection connect(String url, Properties info) throws SQLException;
+
+    public MetaDataMutationResult getTable(byte[] schemaName, byte[] tableName, long tableTimestamp, long clientTimetamp) throws SQLException;
+    public MetaDataMutationResult createTable(List<Mutation> tableMetaData, PTableType tableType, Map<String,Object> tableProps, List<Pair<byte[],Map<String,Object>>> families, byte[][] splits) throws SQLException;
+    public MetaDataMutationResult dropTable(List<Mutation> tableMetadata, PTableType tableType) throws SQLException;
+    public MetaDataMutationResult addColumn(List<Mutation> tableMetaData, PTableType tableType, List<Pair<byte[],Map<String,Object>>> families) throws SQLException;
+    public MetaDataMutationResult dropColumn(List<Mutation> tableMetadata, PTableType tableType) throws SQLException;
+    public MetaDataMutationResult updateIndexState(List<Mutation> tableMetadata, String parentTableName) throws SQLException;
+    public MutationState updateData(MutationPlan plan) throws SQLException;
+
+    public void init(String url, Properties props) throws SQLException;
+
+    public int getLowestClusterHBaseVersion();
+    public HBaseAdmin getAdmin() throws SQLException;
+
+    void clearTableRegionCache(byte[] tableName) throws SQLException;
+
+    boolean hasInvalidIndexConfiguration();
+}