You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2017/03/16 21:11:49 UTC

[27/50] [abbrv] phoenix git commit: PHOENIX-3539 Fix bulkload for StorageScheme - ONE_CELL_PER_KEYVALUE_COLUMN (Samarth Jain and Ankit Singhal)

PHOENIX-3539 Fix bulkload for StorageScheme - ONE_CELL_PER_KEYVALUE_COLUMN (Samarth Jain and Ankit Singhal)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5f5662b2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5f5662b2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5f5662b2

Branch: refs/heads/calcite
Commit: 5f5662b24dad478c9cb0917f20e2af9e6a539266
Parents: c387260
Author: Samarth <sa...@salesforce.com>
Authored: Tue Feb 28 18:00:58 2017 -0800
Committer: Samarth <sa...@salesforce.com>
Committed: Tue Feb 28 18:00:58 2017 -0800

----------------------------------------------------------------------
 .../phoenix/end2end/CsvBulkLoadToolIT.java      | 36 ++++++++++++++
 .../mapreduce/FormatToBytesWritableMapper.java  | 51 ++++++++++++--------
 .../mapreduce/FormatToKeyValueReducer.java      | 44 +++++++++++------
 .../flume/serializer/CsvEventSerializer.java    |  2 +-
 4 files changed, 97 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/5f5662b2/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index 9103bd8..5a186a0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -372,4 +372,40 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
             assertTrue(ex instanceof FileAlreadyExistsException); 
         }
     }
+
+    @Test
+    public void testImportInImmutableTable() throws Exception {
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE IMMUTABLE TABLE S.TABLE10 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE, CF1.T2 DATE, CF2.T3 DATE) ");
+
+        FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input10.csv"));
+        PrintWriter printWriter = new PrintWriter(outputStream);
+        printWriter.println("1,Name 1,1970/01/01,1970/02/01,1970/03/01");
+        printWriter.println("2,Name 2,1970/01/02,1970/02/02,1970/03/02");
+        printWriter.close();
+        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+        csvBulkLoadTool.setConf(new Configuration(getUtility().getConfiguration()));
+        csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB, "yyyy/MM/dd");
+        int exitCode = csvBulkLoadTool.run(new String[] { "--input", "/tmp/input10.csv", "--table", "table10",
+                "--schema", "s", "--zookeeper", zkQuorum });
+        assertEquals(0, exitCode);
+        ResultSet rs = stmt.executeQuery("SELECT id, name, t, CF1.T2, CF2.T3 FROM s.table10 ORDER BY id");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertEquals("Name 1", rs.getString(2));
+        assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3));
+        assertEquals(DateUtil.parseDate("1970-02-01"), rs.getDate(4));
+        assertEquals(DateUtil.parseDate("1970-03-01"), rs.getDate(5));
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertEquals("Name 2", rs.getString(2));
+        assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3));
+        assertEquals(DateUtil.parseDate("1970-02-02"), rs.getDate(4));
+        assertEquals(DateUtil.parseDate("1970-03-02"), rs.getDate(5));
+        assertFalse(rs.next());
+
+        rs.close();
+        stmt.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5f5662b2/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index 278489d..1dae981 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -47,7 +47,9 @@ import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -211,30 +213,41 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
     private void initColumnIndexes() throws SQLException {
         columnIndexes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
         int columnIndex = 0;
-        for(int index = 0; index < logicalNames.size(); index++) {
+        for (int index = 0; index < logicalNames.size(); index++) {
             PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index));
-            List<PColumn> cls = table.getColumns();
-            for (int i = 0; i < cls.size(); i++) {
-                PColumn c = cls.get(i);
-                byte[] family = new byte[0];
-                byte[] cq;
-                if (!SchemaUtil.isPKColumn(c)) {
-                    family = c.getFamilyName().getBytes();
-                    cq = c.getColumnQualifierBytes();
-                } else {
-                    cq = c.getName().getBytes();
-                }
-                byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
-                if (!columnIndexes.containsKey(cfn)) {
+            if (!table.getImmutableStorageScheme().equals(ImmutableStorageScheme.ONE_CELL_PER_COLUMN)) {
+                List<PColumnFamily> cfs = table.getColumnFamilies();
+                for (int i = 0; i < cfs.size(); i++) {
+                    byte[] family = cfs.get(i).getName().getBytes();
+                    byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES,
+                            QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES);
                     columnIndexes.put(cfn, new Integer(columnIndex));
                     columnIndex++;
                 }
+            } else {
+                List<PColumn> cls = table.getColumns();
+                for (int i = 0; i < cls.size(); i++) {
+                    PColumn c = cls.get(i);
+                    byte[] family = new byte[0];
+                    byte[] cq;
+                    if (!SchemaUtil.isPKColumn(c)) {
+                        family = c.getFamilyName().getBytes();
+                        cq = c.getColumnQualifierBytes();
+                    } else {
+                        cq = c.getName().getBytes();
+                    }
+                    byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
+                    if (!columnIndexes.containsKey(cfn)) {
+                        columnIndexes.put(cfn, new Integer(columnIndex));
+                        columnIndex++;
+                    }
+                }
+                byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
+                byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
+                byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES, emptyKeyValue);
+                columnIndexes.put(cfn, new Integer(columnIndex));
+                columnIndex++;
             }
-            byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
-            byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
-            byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES, emptyKeyValue);
-            columnIndexes.put(cfn, new Integer(columnIndex));
-            columnIndex++;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5f5662b2/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index c529afe..07cf285 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -42,7 +42,9 @@ import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
 import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -95,25 +97,35 @@ public class FormatToKeyValueReducer
         int columnIndex = 0;
         for (int index = 0; index < logicalNames.size(); index++) {
             PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index));
-            List<PColumn> cls = table.getColumns();
-            for (int i = 0; i < cls.size(); i++) {
-                PColumn c = cls.get(i);
-                byte[] family = new byte[0];
-                byte[] cq;
-                if (!SchemaUtil.isPKColumn(c)) {
-                    family = c.getFamilyName().getBytes();
-                    cq = c.getColumnQualifierBytes();
-                } else {
-                    // TODO: samarth verify if this is the right thing to do here.
-                    cq = c.getName().getBytes();
-                }
-                byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
-                Pair<byte[], byte[]> pair = new Pair<>(family, cq);
-                if (!indexMap.containsKey(cfn)) {
-                    indexMap.put(cfn, new Integer(columnIndex));
+            if (!table.getImmutableStorageScheme().equals(ImmutableStorageScheme.ONE_CELL_PER_COLUMN)) {
+                List<PColumnFamily> cfs = table.getColumnFamilies();
+                for (int i = 0; i < cfs.size(); i++) {
+                    byte[] family = cfs.get(i).getName().getBytes();
+                    Pair<byte[], byte[]> pair = new Pair<>(family,
+                            QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES);
                     columnIndexes.put(new Integer(columnIndex), pair);
                     columnIndex++;
                 }
+            } else {
+                List<PColumn> cls = table.getColumns();
+                for (int i = 0; i < cls.size(); i++) {
+                    PColumn c = cls.get(i);
+                    byte[] family = new byte[0];
+                    byte[] cq;
+                    if (!SchemaUtil.isPKColumn(c)) {
+                        family = c.getFamilyName().getBytes();
+                        cq = c.getColumnQualifierBytes();
+                    } else {
+                        cq = c.getName().getBytes();
+                    }
+                    byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
+                    Pair<byte[], byte[]> pair = new Pair<>(family, cq);
+                    if (!indexMap.containsKey(cfn)) {
+                        indexMap.put(cfn, new Integer(columnIndex));
+                        columnIndexes.put(new Integer(columnIndex), pair);
+                        columnIndex++;
+                    }
+                }
             }
             byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
             byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5f5662b2/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
----------------------------------------------------------------------
diff --git a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
index 1521084..a856c3e 100644
--- a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
+++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
@@ -189,7 +189,7 @@ public class CsvEventSerializer extends BaseEventSerializer {
 
 		public CSVRecord parse(String input) throws IOException {
 			CSVParser csvParser = new CSVParser(new StringReader(input), this.csvFormat);
-			return ((CSVRecord) Iterables.getFirst(csvParser, null));
+			return Iterables.getFirst(csvParser, null);
 		}
 	}