You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2017/03/16 21:11:49 UTC
[27/50] [abbrv] phoenix git commit: PHOENIX-3539 Fix bulkload for
StorageScheme - ONE_CELL_PER_KEYVALUE_COLUMN (Samarth Jain and Ankit Singhal)
PHOENIX-3539 Fix bulkload for StorageScheme - ONE_CELL_PER_KEYVALUE_COLUMN (Samarth Jain and Ankit Singhal)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5f5662b2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5f5662b2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5f5662b2
Branch: refs/heads/calcite
Commit: 5f5662b24dad478c9cb0917f20e2af9e6a539266
Parents: c387260
Author: Samarth <sa...@salesforce.com>
Authored: Tue Feb 28 18:00:58 2017 -0800
Committer: Samarth <sa...@salesforce.com>
Committed: Tue Feb 28 18:00:58 2017 -0800
----------------------------------------------------------------------
.../phoenix/end2end/CsvBulkLoadToolIT.java | 36 ++++++++++++++
.../mapreduce/FormatToBytesWritableMapper.java | 51 ++++++++++++--------
.../mapreduce/FormatToKeyValueReducer.java | 44 +++++++++++------
.../flume/serializer/CsvEventSerializer.java | 2 +-
4 files changed, 97 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5f5662b2/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index 9103bd8..5a186a0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -372,4 +372,40 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
assertTrue(ex instanceof FileAlreadyExistsException);
}
}
+
+ @Test
+ public void testImportInImmutableTable() throws Exception {
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE IMMUTABLE TABLE S.TABLE10 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE, CF1.T2 DATE, CF2.T3 DATE) ");
+
+ FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+ FSDataOutputStream outputStream = fs.create(new Path("/tmp/input10.csv"));
+ PrintWriter printWriter = new PrintWriter(outputStream);
+ printWriter.println("1,Name 1,1970/01/01,1970/02/01,1970/03/01");
+ printWriter.println("2,Name 2,1970/01/02,1970/02/02,1970/03/02");
+ printWriter.close();
+ CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+ csvBulkLoadTool.setConf(new Configuration(getUtility().getConfiguration()));
+ csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB, "yyyy/MM/dd");
+ int exitCode = csvBulkLoadTool.run(new String[] { "--input", "/tmp/input10.csv", "--table", "table10",
+ "--schema", "s", "--zookeeper", zkQuorum });
+ assertEquals(0, exitCode);
+ ResultSet rs = stmt.executeQuery("SELECT id, name, t, CF1.T2, CF2.T3 FROM s.table10 ORDER BY id");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals("Name 1", rs.getString(2));
+ assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3));
+ assertEquals(DateUtil.parseDate("1970-02-01"), rs.getDate(4));
+ assertEquals(DateUtil.parseDate("1970-03-01"), rs.getDate(5));
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertEquals("Name 2", rs.getString(2));
+ assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3));
+ assertEquals(DateUtil.parseDate("1970-02-02"), rs.getDate(4));
+ assertEquals(DateUtil.parseDate("1970-03-02"), rs.getDate(5));
+ assertFalse(rs.next());
+
+ rs.close();
+ stmt.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5f5662b2/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index 278489d..1dae981 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -47,7 +47,9 @@ import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -211,30 +213,41 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
private void initColumnIndexes() throws SQLException {
columnIndexes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
int columnIndex = 0;
- for(int index = 0; index < logicalNames.size(); index++) {
+ for (int index = 0; index < logicalNames.size(); index++) {
PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index));
- List<PColumn> cls = table.getColumns();
- for (int i = 0; i < cls.size(); i++) {
- PColumn c = cls.get(i);
- byte[] family = new byte[0];
- byte[] cq;
- if (!SchemaUtil.isPKColumn(c)) {
- family = c.getFamilyName().getBytes();
- cq = c.getColumnQualifierBytes();
- } else {
- cq = c.getName().getBytes();
- }
- byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
- if (!columnIndexes.containsKey(cfn)) {
+ if (!table.getImmutableStorageScheme().equals(ImmutableStorageScheme.ONE_CELL_PER_COLUMN)) {
+ List<PColumnFamily> cfs = table.getColumnFamilies();
+ for (int i = 0; i < cfs.size(); i++) {
+ byte[] family = cfs.get(i).getName().getBytes();
+ byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES,
+ QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES);
columnIndexes.put(cfn, new Integer(columnIndex));
columnIndex++;
}
+ } else {
+ List<PColumn> cls = table.getColumns();
+ for (int i = 0; i < cls.size(); i++) {
+ PColumn c = cls.get(i);
+ byte[] family = new byte[0];
+ byte[] cq;
+ if (!SchemaUtil.isPKColumn(c)) {
+ family = c.getFamilyName().getBytes();
+ cq = c.getColumnQualifierBytes();
+ } else {
+ cq = c.getName().getBytes();
+ }
+ byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
+ if (!columnIndexes.containsKey(cfn)) {
+ columnIndexes.put(cfn, new Integer(columnIndex));
+ columnIndex++;
+ }
+ }
+ byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
+ byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
+ byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES, emptyKeyValue);
+ columnIndexes.put(cfn, new Integer(columnIndex));
+ columnIndex++;
}
- byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
- byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
- byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES, emptyKeyValue);
- columnIndexes.put(cfn, new Integer(columnIndex));
- columnIndex++;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5f5662b2/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index c529afe..07cf285 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -42,7 +42,9 @@ import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -95,25 +97,35 @@ public class FormatToKeyValueReducer
int columnIndex = 0;
for (int index = 0; index < logicalNames.size(); index++) {
PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index));
- List<PColumn> cls = table.getColumns();
- for (int i = 0; i < cls.size(); i++) {
- PColumn c = cls.get(i);
- byte[] family = new byte[0];
- byte[] cq;
- if (!SchemaUtil.isPKColumn(c)) {
- family = c.getFamilyName().getBytes();
- cq = c.getColumnQualifierBytes();
- } else {
- // TODO: samarth verify if this is the right thing to do here.
- cq = c.getName().getBytes();
- }
- byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
- Pair<byte[], byte[]> pair = new Pair<>(family, cq);
- if (!indexMap.containsKey(cfn)) {
- indexMap.put(cfn, new Integer(columnIndex));
+ if (!table.getImmutableStorageScheme().equals(ImmutableStorageScheme.ONE_CELL_PER_COLUMN)) {
+ List<PColumnFamily> cfs = table.getColumnFamilies();
+ for (int i = 0; i < cfs.size(); i++) {
+ byte[] family = cfs.get(i).getName().getBytes();
+ Pair<byte[], byte[]> pair = new Pair<>(family,
+ QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES);
columnIndexes.put(new Integer(columnIndex), pair);
columnIndex++;
}
+ } else {
+ List<PColumn> cls = table.getColumns();
+ for (int i = 0; i < cls.size(); i++) {
+ PColumn c = cls.get(i);
+ byte[] family = new byte[0];
+ byte[] cq;
+ if (!SchemaUtil.isPKColumn(c)) {
+ family = c.getFamilyName().getBytes();
+ cq = c.getColumnQualifierBytes();
+ } else {
+ cq = c.getName().getBytes();
+ }
+ byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
+ Pair<byte[], byte[]> pair = new Pair<>(family, cq);
+ if (!indexMap.containsKey(cfn)) {
+ indexMap.put(cfn, new Integer(columnIndex));
+ columnIndexes.put(new Integer(columnIndex), pair);
+ columnIndex++;
+ }
+ }
}
byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5f5662b2/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
----------------------------------------------------------------------
diff --git a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
index 1521084..a856c3e 100644
--- a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
+++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
@@ -189,7 +189,7 @@ public class CsvEventSerializer extends BaseEventSerializer {
public CSVRecord parse(String input) throws IOException {
CSVParser csvParser = new CSVParser(new StringReader(input), this.csvFormat);
- return ((CSVRecord) Iterables.getFirst(csvParser, null));
+ return Iterables.getFirst(csvParser, null);
}
}