You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:18 UTC
[17/51] [partial] Initial commit
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/parse/TraverseNoParseNodeVisitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/parse/TraverseNoParseNodeVisitor.java b/src/main/java/org/apache/phoenix/parse/TraverseNoParseNodeVisitor.java
new file mode 100644
index 0000000..899df6f
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/parse/TraverseNoParseNodeVisitor.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+import java.util.List;
+
+
+/**
+ *
+ * Visitor that traverses into no parse nodes
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class TraverseNoParseNodeVisitor<T> extends BaseParseNodeVisitor<T> {
+ @Override
+ public boolean visitEnter(AndParseNode node) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean visitEnter(OrParseNode node) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean visitEnter(FunctionParseNode node) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean visitEnter(ComparisonParseNode node) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean visitEnter(CaseParseNode node) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean visitEnter(LikeParseNode node) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean visitEnter(BetweenParseNode node) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public T visitLeave(LikeParseNode node, List<T> l) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean visitEnter(NotParseNode node) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean visitEnter(CastParseNode node) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public T visitLeave(NotParseNode node, List<T> l) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public T visitLeave(CastParseNode node, List<T> l) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean visitEnter(InListParseNode node) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public T visitLeave(InListParseNode node, List<T> l) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean visitEnter(IsNullParseNode node) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public T visitLeave(IsNullParseNode node, List<T> l) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public T visit(ColumnParseNode node) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public T visit(LiteralParseNode node) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public T visit(BindParseNode node) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public T visit(WildcardParseNode node) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public T visit(FamilyWildcardParseNode node) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public T visitLeave(AndParseNode node, List<T> l) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public T visitLeave(OrParseNode node, List<T> l) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public T visitLeave(FunctionParseNode node, List<T> l) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public T visitLeave(ComparisonParseNode node, List<T> l) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public T visitLeave(CaseParseNode node, List<T> l) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean visitEnter(MultiplyParseNode node) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public T visitLeave(MultiplyParseNode node, List<T> l) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean visitEnter(SubtractParseNode node) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public T visitLeave(SubtractParseNode node, List<T> l) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean visitEnter(AddParseNode node) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public T visitLeave(AddParseNode node, List<T> l) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean visitEnter(DivideParseNode node) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public T visitLeave(DivideParseNode node, List<T> l) throws SQLException {
+ return null;
+ }
+ @Override
+ public boolean visitEnter(StringConcatParseNode node) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public T visitLeave(StringConcatParseNode node, List<T> l) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public T visitLeave(BetweenParseNode node, List<T> l) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean visitEnter(RowValueConstructorParseNode node) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public T visitLeave(RowValueConstructorParseNode node, List<T> l) throws SQLException {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/parse/UnaryParseNode.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/parse/UnaryParseNode.java b/src/main/java/org/apache/phoenix/parse/UnaryParseNode.java
new file mode 100644
index 0000000..356d047
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/parse/UnaryParseNode.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.util.Collections;
+
+/**
+ *
+ * Abstract node representing an expression that has a single child in SQL
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class UnaryParseNode extends CompoundParseNode {
+ UnaryParseNode(ParseNode expr) {
+ super(Collections.singletonList(expr));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/parse/UnsupportedAllParseNodeVisitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/parse/UnsupportedAllParseNodeVisitor.java b/src/main/java/org/apache/phoenix/parse/UnsupportedAllParseNodeVisitor.java
new file mode 100644
index 0000000..60f7edf
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/parse/UnsupportedAllParseNodeVisitor.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.List;
+
+
+/**
+ *
+ * Visitor that throws UnsupportedOperationException for every
+ * node. Meant to be sub-classed for the case of a small subset
+ * of nodes being supported, in which case only those applicable
+ * methods would be overridden.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+abstract public class UnsupportedAllParseNodeVisitor<E> extends BaseParseNodeVisitor<E> {
+
+ @Override
+ public E visit(ColumnParseNode node) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public E visit(LiteralParseNode node) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public E visit(BindParseNode node) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public E visit(WildcardParseNode node) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public E visit(FamilyWildcardParseNode node) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public boolean visitEnter(AndParseNode node) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public boolean visitEnter(OrParseNode node) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public boolean visitEnter(FunctionParseNode node) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public boolean visitEnter(ComparisonParseNode node) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public boolean visitEnter(BetweenParseNode node) throws SQLException{
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public E visitLeave(AndParseNode node, List<E> l) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public E visitLeave(OrParseNode node, List<E> l) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public E visitLeave(FunctionParseNode node, List<E> l) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public E visitLeave(ComparisonParseNode node, List<E> l) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public boolean visitEnter(LikeParseNode node) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public E visitLeave(LikeParseNode node, List<E> l) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public E visitLeave(NotParseNode node, List<E> l) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public boolean visitEnter(NotParseNode node) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public E visitLeave(CastParseNode node, List<E> l) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public boolean visitEnter(CastParseNode node) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public E visitLeave(InListParseNode node, List<E> l) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public E visitLeave(BetweenParseNode node, List<E> l) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public boolean visitEnter(InListParseNode node) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public boolean visitEnter(IsNullParseNode node) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public E visitLeave(IsNullParseNode node, List<E> l) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public boolean visitEnter(AddParseNode node) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public E visitLeave(AddParseNode node, List<E> l) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public boolean visitEnter(SubtractParseNode node) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public E visitLeave(SubtractParseNode node, List<E> l) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public boolean visitEnter(MultiplyParseNode node) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public E visitLeave(MultiplyParseNode node, List<E> l) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public boolean visitEnter(DivideParseNode node) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public E visitLeave(DivideParseNode node, List<E> l) throws SQLException {
+ throw new SQLFeatureNotSupportedException(node.toString());
+ }
+
+ @Override
+ public List<E> newElementList(int size) {
+ return null;
+ }
+
+ @Override
+ public void addElement(List<E> a, E element) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/parse/UpsertStatement.java b/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
new file mode 100644
index 0000000..777a64c
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.util.Collections;
+import java.util.List;
+
+public class UpsertStatement extends SingleTableSQLStatement {
+ private final List<ColumnName> columns;
+ private final List<ParseNode> values;
+ private final SelectStatement select;
+ private final HintNode hint;
+
+ public UpsertStatement(NamedTableNode table, HintNode hint, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount) {
+ super(table, bindCount);
+ this.columns = columns == null ? Collections.<ColumnName>emptyList() : columns;
+ this.values = values;
+ this.select = select;
+ this.hint = hint == null ? HintNode.EMPTY_HINT_NODE : hint;
+ }
+
+ public List<ColumnName> getColumns() {
+ return columns;
+ }
+
+ public List<ParseNode> getValues() {
+ return values;
+ }
+
+ public SelectStatement getSelect() {
+ return select;
+ }
+
+ public HintNode getHint() {
+ return hint;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/parse/WildcardParseNode.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/parse/WildcardParseNode.java b/src/main/java/org/apache/phoenix/parse/WildcardParseNode.java
new file mode 100644
index 0000000..a0e3b6e
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/parse/WildcardParseNode.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+
+
+
+/**
+ *
+ * Node representing the selection of all columns (*) in the SELECT clause of SQL
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class WildcardParseNode extends TerminalParseNode {
+ private final boolean isRewrite;
+ public static final WildcardParseNode INSTANCE = new WildcardParseNode(false);
+ public static final WildcardParseNode REWRITE_INSTANCE = new WildcardParseNode(true);
+
+ private WildcardParseNode(boolean isRewrite) {
+ this.isRewrite = isRewrite;
+ }
+
+ @Override
+ public <T> T accept(ParseNodeVisitor<T> visitor) throws SQLException {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public String toString() {
+ return "*";
+ }
+
+ public boolean isRewrite() {
+ return isRewrite;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java b/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
new file mode 100644
index 0000000..eff0f87
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+
+import org.apache.phoenix.pig.hadoop.PhoenixOutputFormat;
+import org.apache.phoenix.pig.hadoop.PhoenixRecord;
+
+/**
+ * StoreFunc that uses Phoenix to store data into HBase.
+ *
+ * Example usage: A = load 'testdata' as (a:chararray, b:chararray, c:chararray,
+ * d:chararray, e: datetime); STORE A into 'hbase://CORE.ENTITY_HISTORY' using
+ * org.apache.bdaas.PhoenixHBaseStorage('localhost','-batchSize 5000');
+ *
+ * The above reads a file 'testdata' and writes the elements to HBase. First
+ * argument to this StoreFunc is the server, the 2nd argument is the batch size
+ * for upserts via Phoenix.
+ *
+ * Note that Pig types must be in sync with the target Phoenix data types. This
+ * StoreFunc tries best to cast based on input Pig types and target Phoenix data
+ * types, but it is recommended to supply appropriate schema.
+ *
+ * This is only a STORE implementation. LoadFunc coming soon.
+ *
+ * @author pkommireddi
+ *
+ */
+@SuppressWarnings("rawtypes")
+public class PhoenixHBaseStorage implements StoreFuncInterface {
+
+ private PhoenixPigConfiguration config;
+ private String tableName;
+ private RecordWriter<NullWritable, PhoenixRecord> writer;
+ private String contextSignature = null;
+ private ResourceSchema schema;
+ private long batchSize;
+ private final PhoenixOutputFormat outputFormat = new PhoenixOutputFormat();
+
+ // Set of options permitted
+ private final static Options validOptions = new Options();
+ private final static CommandLineParser parser = new GnuParser();
+ private final static String SCHEMA = "_schema";
+
+ private final CommandLine configuredOptions;
+ private final String server;
+
+ public PhoenixHBaseStorage(String server) throws ParseException {
+ this(server, null);
+ }
+
+ public PhoenixHBaseStorage(String server, String optString)
+ throws ParseException {
+ populateValidOptions();
+ this.server = server;
+
+ String[] optsArr = optString == null ? new String[0] : optString.split(" ");
+ try {
+ configuredOptions = parser.parse(validOptions, optsArr);
+ } catch (ParseException e) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("[-batchSize]", validOptions);
+ throw e;
+ }
+
+ batchSize = Long.parseLong(configuredOptions.getOptionValue("batchSize"));
+ }
+
+ private static void populateValidOptions() {
+ validOptions.addOption("batchSize", true, "Specify upsert batch size");
+ }
+
+ /**
+ * Returns UDFProperties based on <code>contextSignature</code>.
+ */
+ private Properties getUDFProperties() {
+ return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] { contextSignature });
+ }
+
+
+ /**
+ * Parse the HBase table name and configure job
+ */
+ @Override
+ public void setStoreLocation(String location, Job job) throws IOException {
+ String prefix = "hbase://";
+ if (location.startsWith(prefix)) {
+ tableName = location.substring(prefix.length());
+ }
+ config = new PhoenixPigConfiguration(job.getConfiguration());
+ config.configure(server, tableName, batchSize);
+
+ String serializedSchema = getUDFProperties().getProperty(contextSignature + SCHEMA);
+ if (serializedSchema != null) {
+ schema = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void prepareToWrite(RecordWriter writer) throws IOException {
+ this.writer =writer;
+ }
+
+ @Override
+ public void putNext(Tuple t) throws IOException {
+ ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields();
+
+ PhoenixRecord record = new PhoenixRecord(fieldSchemas);
+
+ for(int i=0; i<t.size(); i++) {
+ record.add(t.get(i));
+ }
+
+ try {
+ writer.write(null, record);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ @Override
+ public void setStoreFuncUDFContextSignature(String signature) {
+ this.contextSignature = signature;
+ }
+
+ @Override
+ public void cleanupOnFailure(String location, Job job) throws IOException {
+ }
+
+ @Override
+ public void cleanupOnSuccess(String location, Job job) throws IOException {
+ }
+
+ @Override
+ public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
+ return location;
+ }
+
+ @Override
+ public OutputFormat getOutputFormat() throws IOException {
+ return outputFormat;
+ }
+
+ @Override
+ public void checkSchema(ResourceSchema s) throws IOException {
+ schema = s;
+ getUDFProperties().setProperty(contextSignature + SCHEMA, ObjectSerializer.serialize(schema));
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java b/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
new file mode 100644
index 0000000..f0bf51c
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pig;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.QueryUtil;
+
+/**
+ * A container for configuration to be used with {@link PhoenixHBaseStorage}
+ *
+ * @author pkommireddi
+ *
+ */
+public class PhoenixPigConfiguration {
+
+ private static final Log LOG = LogFactory.getLog(PhoenixPigConfiguration.class);
+
+ /**
+ * Speculative execution of Map tasks
+ */
+ public static final String MAP_SPECULATIVE_EXEC = "mapred.map.tasks.speculative.execution";
+
+ /**
+ * Speculative execution of Reduce tasks
+ */
+ public static final String REDUCE_SPECULATIVE_EXEC = "mapred.reduce.tasks.speculative.execution";
+
+ public static final String SERVER_NAME = "phoenix.hbase.server.name";
+
+ public static final String TABLE_NAME = "phoenix.hbase.table.name";
+
+ public static final String UPSERT_STATEMENT = "phoenix.upsert.stmt";
+
+ public static final String UPSERT_BATCH_SIZE = "phoenix.upsert.batch.size";
+
+ public static final long DEFAULT_UPSERT_BATCH_SIZE = 1000;
+
+ private final Configuration conf;
+
+ private Connection conn;
+ private List<ColumnInfo> columnMetadataList;
+
+ public PhoenixPigConfiguration(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public void configure(String server, String tableName, long batchSize) {
+ conf.set(SERVER_NAME, server);
+ conf.set(TABLE_NAME, tableName);
+ conf.setLong(UPSERT_BATCH_SIZE, batchSize);
+ conf.setBoolean(MAP_SPECULATIVE_EXEC, false);
+ conf.setBoolean(REDUCE_SPECULATIVE_EXEC, false);
+ }
+
+ /**
+ * Creates a {@link Connection} with autoCommit set to false.
+ * @throws SQLException
+ */
+ public Connection getConnection() throws SQLException {
+ Properties props = new Properties();
+ conn = DriverManager.getConnection(QueryUtil.getUrl(this.conf.get(SERVER_NAME)), props).unwrap(PhoenixConnection.class);
+ conn.setAutoCommit(false);
+
+ setup(conn);
+
+ return conn;
+ }
+
+ /**
+ * This method creates the Upsert statement and the Column Metadata
+ * for the Pig query using {@link PhoenixHBaseStorage}. It also
+ * determines the batch size based on user provided options.
+ *
+ * @param conn
+ * @throws SQLException
+ */
+ public void setup(Connection conn) throws SQLException {
+ // Reset batch size
+ long batchSize = getBatchSize() <= 0 ? ((PhoenixConnection) conn).getMutateBatchSize() : getBatchSize();
+ conf.setLong(UPSERT_BATCH_SIZE, batchSize);
+
+ if (columnMetadataList == null) {
+ columnMetadataList = new ArrayList<ColumnInfo>();
+ String[] tableMetadata = getTableMetadata(getTableName());
+ ResultSet rs = conn.getMetaData().getColumns(null, tableMetadata[0], tableMetadata[1], null);
+ while (rs.next()) {
+ columnMetadataList.add(new ColumnInfo(rs.getString(QueryUtil.COLUMN_NAME_POSITION), rs.getInt(QueryUtil.DATA_TYPE_POSITION)));
+ }
+ }
+
+ // Generating UPSERT statement without column name information.
+ String upsertStmt = QueryUtil.constructUpsertStatement(null, getTableName(), columnMetadataList.size());
+ LOG.info("Phoenix Upsert Statement: " + upsertStmt);
+ conf.set(UPSERT_STATEMENT, upsertStmt);
+ }
+
+ public String getUpsertStatement() {
+ return conf.get(UPSERT_STATEMENT);
+ }
+
+ public long getBatchSize() {
+ return conf.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
+ }
+
+
+ public String getServer() {
+ return conf.get(SERVER_NAME);
+ }
+
+ public List<ColumnInfo> getColumnMetadataList() {
+ return columnMetadataList;
+ }
+
+ public String getTableName() {
+ return conf.get(TABLE_NAME);
+ }
+
+ private String[] getTableMetadata(String table) {
+ String[] schemaAndTable = table.split("\\.");
+ assert schemaAndTable.length >= 1;
+
+ if (schemaAndTable.length == 1) {
+ return new String[] { "", schemaAndTable[0] };
+ }
+
+ return new String[] { schemaAndTable[0], schemaAndTable[1] };
+ }
+
+
+ public Configuration getConfiguration() {
+ return this.conf;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/pig/TypeUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/pig/TypeUtil.java b/src/main/java/org/apache/phoenix/pig/TypeUtil.java
new file mode 100644
index 0000000..4bb2637
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/pig/TypeUtil.java
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pig;
+
+import java.io.IOException;
+import java.sql.*;
+
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.joda.time.DateTime;
+
+import org.apache.phoenix.schema.PDataType;
+
+public class TypeUtil {
+
+ private static final Utf8StorageConverter utf8Converter = new Utf8StorageConverter();
+
+ /**
+ * This method returns the most appropriate PDataType associated with
+ * the incoming Pig type. Note for Pig DataType DATETIME, returns DATE as
+ * inferredSqlType.
+ *
+ * This is later used to make a cast to targetPhoenixType accordingly. See
+ * {@link #castPigTypeToPhoenix(Object, byte, PDataType)}
+ *
+ * @param obj
+ * @return PDataType
+ */
+ public static PDataType getType(Object obj, byte type) {
+ if (obj == null) {
+ return null;
+ }
+
+ PDataType sqlType;
+
+ switch (type) {
+ case DataType.BYTEARRAY:
+ sqlType = PDataType.VARBINARY;
+ break;
+ case DataType.CHARARRAY:
+ sqlType = PDataType.VARCHAR;
+ break;
+ case DataType.DOUBLE:
+ sqlType = PDataType.DOUBLE;
+ break;
+ case DataType.FLOAT:
+ sqlType = PDataType.FLOAT;
+ break;
+ case DataType.INTEGER:
+ sqlType = PDataType.INTEGER;
+ break;
+ case DataType.LONG:
+ sqlType = PDataType.LONG;
+ break;
+ case DataType.BOOLEAN:
+ sqlType = PDataType.BOOLEAN;
+ break;
+ case DataType.DATETIME:
+ sqlType = PDataType.DATE;
+ break;
+ default:
+ throw new RuntimeException("Unknown type " + obj.getClass().getName()
+ + " passed to PhoenixHBaseStorage");
+ }
+
+ return sqlType;
+
+ }
+
+ /**
+ * This method encodes a value with Phoenix data type. It begins
+ * with checking whether an object is BINARY and makes a call to
+ * {@link #castBytes(Object, PDataType)} to convery bytes to
+ * targetPhoenixType
+ *
+ * @param o
+ * @param targetPhoenixType
+ * @return Object
+ */
+ public static Object castPigTypeToPhoenix(Object o, byte objectType, PDataType targetPhoenixType) {
+ PDataType inferredPType = getType(o, objectType);
+
+ if(inferredPType == null) {
+ return null;
+ }
+
+ if(inferredPType == PDataType.VARBINARY && targetPhoenixType != PDataType.VARBINARY) {
+ try {
+ o = castBytes(o, targetPhoenixType);
+ inferredPType = getType(o, DataType.findType(o));
+ } catch (IOException e) {
+ throw new RuntimeException("Error while casting bytes for object " +o);
+ }
+ }
+
+ if(inferredPType == PDataType.DATE) {
+ int inferredSqlType = targetPhoenixType.getSqlType();
+
+ if(inferredSqlType == Types.DATE) {
+ return new Date(((DateTime)o).getMillis());
+ }
+ if(inferredSqlType == Types.TIME) {
+ return new Time(((DateTime)o).getMillis());
+ }
+ if(inferredSqlType == Types.TIMESTAMP) {
+ return new Timestamp(((DateTime)o).getMillis());
+ }
+ }
+
+ if (targetPhoenixType == inferredPType || inferredPType.isCoercibleTo(targetPhoenixType)) {
+ return inferredPType.toObject(o, targetPhoenixType);
+ }
+
+ throw new RuntimeException(o.getClass().getName()
+ + " cannot be coerced to "+targetPhoenixType.toString());
+ }
+
+ /**
+ * This method converts bytes to the target type required
+ * for Phoenix. It uses {@link Utf8StorageConverter} for
+ * the conversion.
+ *
+ * @param o
+ * @param targetPhoenixType
+ * @return Object
+ * @throws IOException
+ */
+ public static Object castBytes(Object o, PDataType targetPhoenixType) throws IOException {
+ byte[] bytes = ((DataByteArray)o).get();
+
+ switch(targetPhoenixType) {
+ case CHAR:
+ case VARCHAR:
+ return utf8Converter.bytesToCharArray(bytes);
+ case UNSIGNED_INT:
+ case INTEGER:
+ return utf8Converter.bytesToInteger(bytes);
+ case BOOLEAN:
+ return utf8Converter.bytesToBoolean(bytes);
+// case DECIMAL: not in Pig v 0.11.0, so using double for now
+// return utf8Converter.bytesToBigDecimal(bytes);
+ case DECIMAL:
+ return utf8Converter.bytesToDouble(bytes);
+ case UNSIGNED_LONG:
+ case LONG:
+ return utf8Converter.bytesToLong(bytes);
+ case TIME:
+ case TIMESTAMP:
+ case DATE:
+ return utf8Converter.bytesToDateTime(bytes);
+ default:
+ return o;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java b/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java
new file mode 100644
index 0000000..503f570
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pig.hadoop;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import org.apache.phoenix.jdbc.PhoenixStatement;
+
+/**
+ *
+ * {@link OutputCommitter} implementation for Pig tasks using Phoenix
+ * connections to upsert to HBase
+ *
+ * @author pkommireddi
+ *
+ */
+public class PhoenixOutputCommitter extends OutputCommitter {
+ private final Log LOG = LogFactory.getLog(PhoenixOutputCommitter.class);
+
+ private final PhoenixOutputFormat outputFormat;
+
+ public PhoenixOutputCommitter(PhoenixOutputFormat outputFormat) {
+ if(outputFormat == null) {
+ throw new IllegalArgumentException("PhoenixOutputFormat must not be null.");
+ }
+ this.outputFormat = outputFormat;
+ }
+
+ /**
+ * TODO implement rollback functionality.
+ *
+ * {@link PhoenixStatement#execute(String)} is buffered on the client, this makes
+ * it difficult to implement rollback as once a commit is issued it's hard to go
+ * back all the way to undo.
+ */
+ @Override
+ public void abortTask(TaskAttemptContext context) throws IOException {
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext context) throws IOException {
+ commit(outputFormat.getConnection(context.getConfiguration()));
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+ return true;
+ }
+
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext context) throws IOException {
+ }
+
+ /**
+ * Commit a transaction on task completion
+ *
+ * @param connection
+ * @throws IOException
+ */
+ private void commit(Connection connection) throws IOException {
+ try {
+ if (connection == null || connection.isClosed()) {
+ throw new IOException("Trying to commit a connection that is null or closed: "+ connection);
+ }
+ } catch (SQLException e) {
+ throw new IOException("Exception calling isClosed on connection", e);
+ }
+
+ try {
+ LOG.debug("Commit called on task completion");
+ connection.commit();
+ } catch (SQLException e) {
+ throw new IOException("Exception while trying to commit a connection. ", e);
+ } finally {
+ try {
+ LOG.debug("Closing connection to database on task completion");
+ connection.close();
+ } catch (SQLException e) {
+ LOG.warn("Exception while trying to close database connection", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java b/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java
new file mode 100644
index 0000000..814dc33
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pig.hadoop;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import org.apache.phoenix.pig.PhoenixPigConfiguration;
+
+/**
+ * {@link OutputFormat} implementation for Phoenix
+ *
+ * @author pkommireddi
+ *
+ */
+public class PhoenixOutputFormat extends OutputFormat<NullWritable, PhoenixRecord> {
+ private static final Log LOG = LogFactory.getLog(PhoenixOutputFormat.class);
+
+ private Connection connection;
+ private PhoenixPigConfiguration config;
+
+ @Override
+ public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
+ }
+
+ /**
+ * TODO Implement {@link OutputCommitter} to rollback in case of task failure
+ */
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+ return new PhoenixOutputCommitter(this);
+ }
+
+ @Override
+ public RecordWriter<NullWritable, PhoenixRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+ try {
+ return new PhoenixRecordWriter(getConnection(context.getConfiguration()), config);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * This method creates a database connection. A single instance is created
+ * and passed around for re-use.
+ *
+ * @param configuration
+ * @return
+ * @throws IOException
+ */
+ synchronized Connection getConnection(Configuration configuration) throws IOException {
+ if (connection != null) {
+ return connection;
+ }
+
+ config = new PhoenixPigConfiguration(configuration);
+ try {
+ LOG.info("Initializing new Phoenix connection...");
+ connection = config.getConnection();
+ LOG.info("Initialized Phoenix connection, autoCommit="+ connection.getAutoCommit());
+ return connection;
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java b/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
new file mode 100644
index 0000000..2973c08
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pig.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataType;
+
+import org.apache.phoenix.pig.TypeUtil;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ColumnInfo;
+
+/**
+ * A {@link Writable} representing a Phoenix record. This class
+ * does a type mapping and sets the value accordingly in the
+ * {@link PreparedStatement}
+ *
+ * @author pkommireddi
+ *
+ */
+public class PhoenixRecord implements Writable {
+
+ private final List<Object> values;
+ private final ResourceFieldSchema[] fieldSchemas;
+
+ public PhoenixRecord(ResourceFieldSchema[] fieldSchemas) {
+ this.values = new ArrayList<Object>();
+ this.fieldSchemas = fieldSchemas;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ }
+
+ public void write(PreparedStatement statement, List<ColumnInfo> columnMetadataList) throws SQLException {
+ for (int i = 0; i < columnMetadataList.size(); i++) {
+ Object o = values.get(i);
+
+ byte type = (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
+ Object upsertValue = convertTypeSpecificValue(o, type, columnMetadataList.get(i).getSqlType());
+
+ if (upsertValue != null) {
+ statement.setObject(i + 1, upsertValue, columnMetadataList.get(i).getSqlType());
+ } else {
+ statement.setNull(i + 1, columnMetadataList.get(i).getSqlType());
+ }
+ }
+
+ statement.execute();
+ }
+
+ public void add(Object value) {
+ values.add(value);
+ }
+
+ private Object convertTypeSpecificValue(Object o, byte type, Integer sqlType) {
+ PDataType pDataType = PDataType.fromSqlType(sqlType);
+
+ return TypeUtil.castPigTypeToPhoenix(o, type, pDataType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java b/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
new file mode 100644
index 0000000..c92d374
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pig.hadoop;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import org.apache.phoenix.pig.PhoenixPigConfiguration;
+
+/**
+ *
+ * {@link RecordWriter} implementation for Phoenix
+ *
+ * @author pkommireddi
+ *
+ */
+public class PhoenixRecordWriter extends RecordWriter<NullWritable, PhoenixRecord> {
+
+ private static final Log LOG = LogFactory.getLog(PhoenixRecordWriter.class);
+
+ private long numRecords = 0;
+
+ private final Connection conn;
+ private final PreparedStatement statement;
+ private final PhoenixPigConfiguration config;
+ private final long batchSize;
+
+ public PhoenixRecordWriter(Connection conn, PhoenixPigConfiguration config) throws SQLException {
+ this.conn = conn;
+ this.config = config;
+ this.batchSize = config.getBatchSize();
+ this.statement = this.conn.prepareStatement(config.getUpsertStatement());
+ }
+
+
+ /**
+ * Committing and closing the connection is handled by {@link PhoenixOutputCommitter}.
+ *
+ */
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ }
+
+ @Override
+ public void write(NullWritable n, PhoenixRecord record) throws IOException, InterruptedException {
+ try {
+ record.write(statement, config.getColumnMetadataList());
+ numRecords++;
+
+ if (numRecords % batchSize == 0) {
+ LOG.debug("commit called on a batch of size : " + batchSize);
+ conn.commit();
+ }
+ } catch (SQLException e) {
+ throw new IOException("Exception while committing to database.", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java b/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
new file mode 100644
index 0000000..27fa10e
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.query;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.phoenix.job.JobManager;
+import org.apache.phoenix.memory.GlobalMemoryManager;
+import org.apache.phoenix.memory.MemoryManager;
+import org.apache.phoenix.optimize.QueryOptimizer;
+import org.apache.phoenix.util.ReadOnlyProps;
+
+
+
+/**
+ *
+ * Base class for QueryService implementors.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class BaseQueryServicesImpl implements QueryServices {
+ private final ExecutorService executor;
+ private final MemoryManager memoryManager;
+ private final ReadOnlyProps props;
+ private final QueryOptimizer queryOptimizer;
+
+ public BaseQueryServicesImpl(QueryServicesOptions options) {
+ this.executor = JobManager.createThreadPoolExec(
+ options.getKeepAliveMs(),
+ options.getThreadPoolSize(),
+ options.getQueueSize());
+ this.memoryManager = new GlobalMemoryManager(
+ Runtime.getRuntime().totalMemory() * options.getMaxMemoryPerc() / 100,
+ options.getMaxMemoryWaitMs());
+ this.props = options.getProps();
+ this.queryOptimizer = new QueryOptimizer(this);
+ }
+
+ @Override
+ public ExecutorService getExecutor() {
+ return executor;
+ }
+
+ @Override
+ public MemoryManager getMemoryManager() {
+ return memoryManager;
+ }
+
+ @Override
+ public final ReadOnlyProps getProps() {
+ return props;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public QueryOptimizer getOptimizer() {
+ return queryOptimizer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/query/ChildQueryServices.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/query/ChildQueryServices.java b/src/main/java/org/apache/phoenix/query/ChildQueryServices.java
new file mode 100644
index 0000000..93613d6
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/query/ChildQueryServices.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.query;
+
+import org.apache.phoenix.memory.ChildMemoryManager;
+import org.apache.phoenix.memory.MemoryManager;
+
+/**
+ *
+ * Child QueryServices that delegates through to global QueryService.
+ * Used to track memory used by each org to allow a max percentage threshold.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ChildQueryServices extends DelegateConnectionQueryServices {
+ private final MemoryManager memoryManager;
+ private static final int DEFAULT_MAX_ORG_MEMORY_PERC = 30;
+
+ public ChildQueryServices(ConnectionQueryServices services) {
+ super(services);
+ int maxOrgMemPerc = getProps().getInt(MAX_TENANT_MEMORY_PERC_ATTRIB, DEFAULT_MAX_ORG_MEMORY_PERC);
+ this.memoryManager = new ChildMemoryManager(services.getMemoryManager(), maxOrgMemPerc);
+ }
+
+ @Override
+ public MemoryManager getMemoryManager() {
+ return memoryManager;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/query/ConfigurationFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/query/ConfigurationFactory.java b/src/main/java/org/apache/phoenix/query/ConfigurationFactory.java
new file mode 100644
index 0000000..77d737e
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/query/ConfigurationFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.query;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+
+/**
+ * Creates {@link Configuration} instances that contain HBase/Hadoop settings.
+ *
+ * @author aaraujo
+ * @since 2.0
+ */
+public interface ConfigurationFactory {
+ /**
+ * @return Configuration containing HBase/Hadoop settings
+ */
+ Configuration getConfiguration();
+
+ /**
+ * Default implementation uses {@link org.apache.hadoop.hbase.HBaseConfiguration#create()}.
+ */
+ static class ConfigurationFactoryImpl implements ConfigurationFactory {
+ @Override
+ public Configuration getConfiguration() {
+ return HBaseConfiguration.create();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
new file mode 100644
index 0000000..9f304c5
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.query;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+
+import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PTableType;
+
+
+public interface ConnectionQueryServices extends QueryServices, MetaDataMutated {
+ /**
+ * Get (and create if necessary) a child QueryService for a given tenantId.
+ * The QueryService will be cached for the lifetime of the parent QueryService
+ * @param tenantId the organization ID
+ * @return the child QueryService
+ */
+ public ConnectionQueryServices getChildQueryServices(ImmutableBytesWritable tenantId);
+
+ /**
+ * Get an HTableInterface by the given name. It is the callers
+ * responsibility to close the returned HTableInterface.
+ * @param tableName the name of the HTable
+ * @return the HTableInterface
+ * @throws SQLException
+ */
+ public HTableInterface getTable(byte[] tableName) throws SQLException;
+
+ public HTableDescriptor getTableDescriptor(byte[] tableName) throws SQLException;
+
+ public StatsManager getStatsManager();
+
+ public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException;
+
+ public PhoenixConnection connect(String url, Properties info) throws SQLException;
+
+ public MetaDataMutationResult getTable(byte[] schemaName, byte[] tableName, long tableTimestamp, long clientTimetamp) throws SQLException;
+ public MetaDataMutationResult createTable(List<Mutation> tableMetaData, PTableType tableType, Map<String,Object> tableProps, List<Pair<byte[],Map<String,Object>>> families, byte[][] splits) throws SQLException;
+ public MetaDataMutationResult dropTable(List<Mutation> tableMetadata, PTableType tableType) throws SQLException;
+ public MetaDataMutationResult addColumn(List<Mutation> tableMetaData, PTableType tableType, List<Pair<byte[],Map<String,Object>>> families) throws SQLException;
+ public MetaDataMutationResult dropColumn(List<Mutation> tableMetadata, PTableType tableType) throws SQLException;
+ public MetaDataMutationResult updateIndexState(List<Mutation> tableMetadata, String parentTableName) throws SQLException;
+ public MutationState updateData(MutationPlan plan) throws SQLException;
+
+ public void init(String url, Properties props) throws SQLException;
+
+ public int getLowestClusterHBaseVersion();
+ public HBaseAdmin getAdmin() throws SQLException;
+
+ void clearTableRegionCache(byte[] tableName) throws SQLException;
+
+ boolean hasInvalidIndexConfiguration();
+}