You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/02/24 22:13:32 UTC
[14/50] [abbrv] phoenix git commit: PHOENIX-2334 CSV Bulk load fails
on local indexes(Rajeshbabu)
PHOENIX-2334 CSV Bulk load fails on local indexes(Rajeshbabu)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/decbfe30
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/decbfe30
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/decbfe30
Branch: refs/heads/calcite
Commit: decbfe3062bbc970050e03fbb198e61a2d30e88c
Parents: c48fee0
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Thu Feb 11 02:48:05 2016 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Thu Feb 11 02:48:05 2016 +0530
----------------------------------------------------------------------
.../phoenix/end2end/CsvBulkLoadToolIT.java | 27 +++++++++++---------
.../phoenix/mapreduce/AbstractBulkLoadTool.java | 16 ++++++++----
.../mapreduce/bulkload/TargetTableRef.java | 2 +-
.../phoenix/query/ConnectionQueryServices.java | 1 +
.../query/ConnectionQueryServicesImpl.java | 27 ++++++++++++++++++++
.../query/ConnectionlessQueryServicesImpl.java | 15 +++++++++++
.../query/DelegateConnectionQueryServices.java | 6 +++++
.../java/org/apache/phoenix/util/IndexUtil.java | 10 +++++++-
8 files changed, 85 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index 26ec889..96042c5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -218,7 +218,7 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterHBaseManagedTimeIT {
Statement stmt = conn.createStatement();
stmt.execute("CREATE TABLE TABLE6 (ID INTEGER NOT NULL PRIMARY KEY, " +
- "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+ "FIRST_NAME VARCHAR, LAST_NAME VARCHAR) SPLIt ON (1,2)");
String ddl = "CREATE LOCAL INDEX TABLE6_IDX ON TABLE6 "
+ " (FIRST_NAME ASC)";
stmt.execute(ddl);
@@ -234,16 +234,19 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterHBaseManagedTimeIT {
CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
csvBulkLoadTool.setConf(getUtility().getConfiguration());
- try {
- csvBulkLoadTool.run(new String[] {
- "--input", "/tmp/input3.csv",
- "--table", "table6",
- "--zookeeper", zkQuorum});
- fail("Csv bulk load currently has issues with local indexes.");
- } catch( UnsupportedOperationException ise) {
- assertEquals("Local indexes not supported by Bulk Loader",ise.getMessage());
- }
-
+ int exitCode = csvBulkLoadTool.run(new String[] {
+ "--input", "/tmp/input3.csv",
+ "--table", "table6",
+ "--zookeeper", zkQuorum});
+ assertEquals(0, exitCode);
+
+ ResultSet rs = stmt.executeQuery("SELECT id, FIRST_NAME FROM TABLE6 where first_name='FirstName 2'");
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertEquals("FirstName 2", rs.getString(2));
+
+ rs.close();
+ stmt.close();
}
@Test
@@ -251,7 +254,7 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterHBaseManagedTimeIT {
testImportOneIndexTable("TABLE4", false);
}
- //@Test
+ @Test
public void testImportOneLocalIndexTable() throws Exception {
testImportOneIndexTable("TABLE5", true);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index f6ba5f6..39ee4b1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -21,8 +21,10 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import org.apache.commons.cli.CommandLine;
@@ -54,6 +56,7 @@ import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -295,7 +298,12 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
}
private void completebulkload(Configuration conf,Path outputPath , List<TargetTableRef> tablesToBeLoaded) throws Exception {
+ Set<String> tableNames = new HashSet<>(tablesToBeLoaded.size());
for(TargetTableRef table : tablesToBeLoaded) {
+ if(tableNames.contains(table.getPhysicalName())){
+ continue;
+ }
+ tableNames.add(table.getPhysicalName());
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
String tableName = table.getPhysicalName();
Path tableOutputPath = new Path(outputPath,tableName);
@@ -382,11 +390,9 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
List<TargetTableRef> indexTables = new ArrayList<TargetTableRef>();
for(PTable indexTable : table.getIndexes()){
if (indexTable.getIndexType() == PTable.IndexType.LOCAL) {
- throw new UnsupportedOperationException("Local indexes not supported by Bulk Loader");
- /*indexTables.add(
- new TargetTableRef(getQualifiedTableName(schemaName,
- indexTable.getTableName().getString()),
- MetaDataUtil.getLocalIndexTableName(qualifiedTableName))); */
+ indexTables.add(new TargetTableRef(getQualifiedTableName(schemaName, indexTable
+ .getTableName().getString()), MetaDataUtil
+ .getLocalIndexTableName(qualifiedTableName)));
} else {
indexTables.add(new TargetTableRef(getQualifiedTableName(schemaName,
indexTable.getTableName().getString())));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java
index 1a846f9..2c3069f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java
@@ -46,7 +46,7 @@ public class TargetTableRef {
}
@JsonCreator
- private TargetTableRef(@JsonProperty("logicalName") String logicalName,
+ public TargetTableRef(@JsonProperty("logicalName") String logicalName,
@JsonProperty("physicalName") String physicalName) {
this.logicalName = logicalName;
this.physicalName = physicalName;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 25d7ff4..b5f1f85 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -69,6 +69,7 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
public HTableDescriptor getTableDescriptor(byte[] tableName) throws SQLException;
+ public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] row) throws SQLException;
public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException;
public PhoenixConnection connect(String url, Properties info) throws SQLException;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index b29e3d9..897c207 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -3368,4 +3368,31 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
public boolean isRenewingLeasesEnabled() {
return supportsFeature(ConnectionQueryServices.Feature.RENEW_LEASE) && renewLeaseEnabled;
}
+
+
+ @Override
+ public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] row) throws SQLException {
+ /*
+ * Use HConnection.getRegionLocation as it uses the cache in HConnection, to get the region
+ * to which specified row belongs to.
+ */
+ int retryCount = 0, maxRetryCount = 1;
+ boolean reload =false;
+ while (true) {
+ try {
+ return connection.getRegionLocation(TableName.valueOf(tableName), row, reload);
+ } catch (org.apache.hadoop.hbase.TableNotFoundException e) {
+ String fullName = Bytes.toString(tableName);
+ throw new TableNotFoundException(SchemaUtil.getSchemaNameFromFullName(fullName), SchemaUtil.getTableNameFromFullName(fullName));
+ } catch (IOException e) {
+ if (retryCount++ < maxRetryCount) { // One retry, in case split occurs while navigating
+ reload = true;
+ continue;
+ }
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.GET_TABLE_REGIONS_FAIL)
+ .setRootCause(e).build().buildException();
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 6cfb382..b4bbe1f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -577,4 +577,19 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
public boolean isRenewingLeasesEnabled() {
return false;
}
+
+ public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] row) throws SQLException {
+ List<HRegionLocation> regions = tableSplits.get(Bytes.toString(tableName));
+ if (regions != null) {
+ for(HRegionLocation region: regions) {
+ if (Bytes.compareTo(region.getRegionInfo().getStartKey(), row) <= 0
+ && Bytes.compareTo(region.getRegionInfo().getEndKey(), row) > 0) {
+ return region;
+ }
+ }
+ }
+ return new HRegionLocation(
+ new HRegionInfo(TableName.valueOf(tableName), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW),
+ SERVER_NAME, -1);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 9b721f8..4c7446b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -304,4 +304,10 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
public boolean isRenewingLeasesEnabled() {
return getDelegate().isRenewingLeasesEnabled();
}
+
+ @Override
+ public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] row)
+ throws SQLException {
+ return getDelegate().getTableRegionLocation(tableName, row);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index f361fb9..98b88f4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
@@ -283,7 +284,14 @@ public class IndexUtil {
}
};
- indexMutations.add(maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, ts, null, null));
+ byte[] regionStartKey = null;
+ byte[] regionEndkey = null;
+ if(maintainer.isLocalIndex()) {
+ HRegionLocation tableRegionLocation = connection.getQueryServices().getTableRegionLocation(table.getName().getBytes(), dataMutation.getRow());
+ regionStartKey = tableRegionLocation.getRegionInfo().getStartKey();
+ regionEndkey = tableRegionLocation.getRegionInfo().getEndKey();
+ }
+ indexMutations.add(maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, ts, regionStartKey, regionEndkey));
}
}
return indexMutations;