You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by nd...@apache.org on 2015/12/22 22:38:06 UTC

phoenix git commit: PHOENIX-2492 Expose PhoenixRecordWritable outside of spark

Repository: phoenix
Updated Branches:
  refs/heads/master 05ff5618d -> c0d7a9fee


PHOENIX-2492 Expose PhoenixRecordWritable outside of spark


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c0d7a9fe
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c0d7a9fe
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c0d7a9fe

Branch: refs/heads/master
Commit: c0d7a9fee1a54eb20a870447e385e42cd6a4d81e
Parents: 05ff561
Author: Nick Dimiduk <nd...@apache.org>
Authored: Fri Dec 4 16:04:25 2015 -0800
Committer: Nick Dimiduk <nd...@apache.org>
Committed: Tue Dec 22 15:56:32 2015 -0500

----------------------------------------------------------------------
 .../mapreduce/PhoenixRecordWritable.java        | 192 +++++++++++++++++++
 1 file changed, 192 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/c0d7a9fe/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWritable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWritable.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWritable.java
new file mode 100644
index 0000000..8d7d97a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWritable.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.schema.types.*;
+import org.apache.phoenix.util.ColumnInfo;
+import org.joda.time.DateTime;
+
+import java.sql.Array;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class PhoenixRecordWritable implements DBWritable {
+
+    private final List<Object> upsertValues = new ArrayList<>();
+    private final Map<String, Object> resultMap = new HashMap<>();
+    private List<ColumnInfo> columnMetaDataList;
+
+    /** For serialization; do not use. */
+    public PhoenixRecordWritable() {
+        this(new ArrayList<ColumnInfo>());
+    }
+
+    public PhoenixRecordWritable(List<ColumnInfo> columnMetaDataList) {
+        this.columnMetaDataList = columnMetaDataList;
+    }
+
+    /**
+     * Helper method to create a {@link Array} for a specific {@link PDataType}, and set it on
+     * the provided {@code stmt}.
+     */
+    private static void setArrayInStatement(PreparedStatement stmt, PDataType<?> type,
+            Object[] obj, int position) throws SQLException {
+        Array sqlArray = stmt.getConnection().createArrayOf(
+                PDataType.arrayBaseType(type).getSqlTypeName(), obj);
+        stmt.setArray(position, sqlArray);
+    }
+
+    private static Object[] primativeArrayToObjectArray(byte[] a) {
+        final Byte[] ret = new Byte[a.length];
+        for (int i = 0; i < a.length; i++) {
+            ret[i] = a[i];
+        }
+        return ret;
+    }
+
+    private static Object[] primativeArrayToObjectArray(short[] a) {
+        final Short[] ret = new Short[a.length];
+        for (int i = 0; i < a.length; i++) {
+            ret[i] = a[i];
+        }
+        return ret;
+    }
+
+    private static Object[] primativeArrayToObjectArray(int[] a) {
+        final Integer[] ret = new Integer[a.length];
+        for (int i = 0; i < a.length; i++) {
+            ret[i] = a[i];
+        }
+        return ret;
+    }
+
+    private static Object[] primativeArrayToObjectArray(float[] a) {
+        final Float[] ret = new Float[a.length];
+        for (int i = 0; i < a.length; i++) {
+            ret[i] = a[i];
+        }
+        return ret;
+    }
+
+    private static Object[] primativeArrayToObjectArray(double[] a) {
+        final Double[] ret = new Double[a.length];
+        for (int i = 0; i < a.length; i++) {
+            ret[i] = a[i];
+        }
+        return ret;
+    }
+
+    private static Object[] primativeArrayToObjectArray(long[] a) {
+        final Long[] ret = new Long[a.length];
+        for (int i = 0; i < a.length; i++) {
+            ret[i] = a[i];
+        }
+        return ret;
+    }
+
+    @Override public void write(PreparedStatement statement) throws SQLException {
+        // make sure we at least line up in size
+        if (upsertValues.size() != columnMetaDataList.size()) {
+            throw new UnsupportedOperationException("Provided " + upsertValues.size()
+                    + " upsert values, but column metadata expects " + columnMetaDataList.size()
+                    + " columns.");
+        }
+
+        // correlate each value (v) to a column type (c) and an index (i)
+        for (int i = 0; i < upsertValues.size(); i++) {
+            Object v = upsertValues.get(i);
+            ColumnInfo c = columnMetaDataList.get(i);
+
+            if (v == null) {
+                statement.setNull(i + 1, c.getSqlType());
+                continue;
+            }
+
+            // both Java and Joda dates used to work in 4.2.3, but now they must be java.sql.Date
+            // can override any other types here as needed
+            final Object finalObj;
+            final PDataType<?> finalType;
+            if (v instanceof DateTime) {
+                finalObj = new java.sql.Date(((DateTime) v).getMillis());
+                finalType = PDate.INSTANCE;
+            } else if (v instanceof java.util.Date) {
+                finalObj = new java.sql.Date(((java.util.Date) v).getTime());
+                finalType = PDate.INSTANCE;
+            } else {
+                finalObj = v;
+                finalType = c.getPDataType();
+            }
+
+            if (finalObj instanceof Object[]) {
+                setArrayInStatement(statement, finalType, (Object[]) finalObj, i + 1);
+            } else if (finalObj instanceof byte[]) {
+                // PVarbinary and PBinary are provided as byte[] but are treated as SQL objects
+                if (PDataType.equalsAny(finalType, PVarbinary.INSTANCE, PBinary.INSTANCE)) {
+                    statement.setObject(i + 1, finalObj);
+                }
+                // otherwise set as array type
+                setArrayInStatement(statement, finalType, primativeArrayToObjectArray((byte[]) finalObj), i + 1);
+            } else if (finalObj instanceof short[]) {
+                setArrayInStatement(statement, finalType, primativeArrayToObjectArray((short[]) finalObj), i + 1);
+            } else if (finalObj instanceof int[]) {
+                setArrayInStatement(statement, finalType, primativeArrayToObjectArray((int[]) finalObj), i + 1);
+            } else if (finalObj instanceof long[]) {
+                setArrayInStatement(statement, finalType, primativeArrayToObjectArray((long[]) finalObj), i + 1);
+            } else if (finalObj instanceof float[]) {
+                setArrayInStatement(statement, finalType, primativeArrayToObjectArray((float[]) finalObj), i + 1);
+            } else if (finalObj instanceof double[]) {
+                setArrayInStatement(statement, finalType, primativeArrayToObjectArray((double[]) finalObj), i + 1);
+            } else {
+                statement.setObject(i + 1, finalObj);
+            }
+        }
+    }
+
+    @Override public void readFields(ResultSet resultSet) throws SQLException {
+        ResultSetMetaData metaData = resultSet.getMetaData();
+        for (int i = 1; i <= metaData.getColumnCount(); i++) {
+            // return the contents of a PhoenixArray, if necessary
+            Object value = resultSet.getObject(i);
+            if (value instanceof PhoenixArray) {
+                value = ((PhoenixArray) value).getArray();
+            }
+
+            // put a (ColumnLabel -> value) entry into the result map
+            resultMap.put(metaData.getColumnLabel(i), value);
+        }
+    }
+
+    /** Append an object to the list of values to upsert. */
+    public void add(Object value) {
+        upsertValues.add(value);
+    }
+
+    /** @return an immutable view on the {@link ResultSet} content. */
+    public Map<String, Object> getResultMap() {
+        return Collections.unmodifiableMap(resultMap);
+    }
+}