You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by pb...@apache.org on 2018/02/01 00:06:54 UTC
[22/35] phoenix git commit: PHOENIX-4415 Ignore CURRENT_SCN property
if set in Pig Storer
PHOENIX-4415 Ignore CURRENT_SCN property if set in Pig Storer
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2c4ca690
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2c4ca690
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2c4ca690
Branch: refs/heads/4.x-cdh5.11.2
Commit: 2c4ca6900ae1c4f43e293aa0096393356dd3bbfa
Parents: cc44562
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Nov 8 03:13:53 2017 +0000
Committer: Pedro Boado <pb...@apache.org>
Committed: Wed Jan 31 22:24:48 2018 +0000
----------------------------------------------------------------------
.../phoenix/mapreduce/PhoenixOutputFormat.java | 13 ++++++++++-
.../phoenix/mapreduce/PhoenixRecordWriter.java | 8 ++++++-
.../phoenix/mapreduce/util/ConnectionUtil.java | 23 ++++++++++++++++----
.../org/apache/phoenix/util/PropertiesUtil.java | 9 +++++++-
.../java/org/apache/phoenix/pig/BasePigIT.java | 4 ++++
.../apache/phoenix/pig/PhoenixHBaseStorage.java | 12 ++++++----
6 files changed, 58 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c4ca690/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
index e55b977..4217e40 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
@@ -19,6 +19,8 @@ package org.apache.phoenix.mapreduce;
import java.io.IOException;
import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,6 +38,15 @@ import org.apache.hadoop.mapreduce.lib.db.DBWritable;
*/
public class PhoenixOutputFormat <T extends DBWritable> extends OutputFormat<NullWritable,T> {
private static final Log LOG = LogFactory.getLog(PhoenixOutputFormat.class);
+ private final Set<String> propsToIgnore;
+
+ public PhoenixOutputFormat() {
+ this(Collections.<String>emptySet());
+ }
+
+ public PhoenixOutputFormat(Set<String> propsToIgnore) {
+ this.propsToIgnore = propsToIgnore;
+ }
@Override
public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
@@ -52,7 +63,7 @@ public class PhoenixOutputFormat <T extends DBWritable> extends OutputFormat<Nul
@Override
public RecordWriter<NullWritable, T> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
try {
- return new PhoenixRecordWriter<T>(context.getConfiguration());
+ return new PhoenixRecordWriter<T>(context.getConfiguration(), propsToIgnore);
} catch (SQLException e) {
LOG.error("Error calling PhoenixRecordWriter " + e.getMessage());
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c4ca690/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
index 70ee3f5..52f2fe3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -46,7 +48,11 @@ public class PhoenixRecordWriter<T extends DBWritable> extends RecordWriter<Nul
private long numRecords = 0;
public PhoenixRecordWriter(final Configuration configuration) throws SQLException {
- this.conn = ConnectionUtil.getOutputConnection(configuration);
+ this(configuration, Collections.<String>emptySet());
+ }
+
+ public PhoenixRecordWriter(final Configuration configuration, Set<String> propsToIgnore) throws SQLException {
+ this.conn = ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, propsToIgnore);
this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration);
final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
this.statement = this.conn.prepareStatement(upsertQuery);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c4ca690/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
index ada3816..56a5ef5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
@@ -20,15 +20,16 @@ package org.apache.phoenix.mapreduce.util;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.Collections;
import java.util.Properties;
+import java.util.Set;
-import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
+import com.google.common.base.Preconditions;
+
/**
* Utility class to return a {@link Connection} .
*/
@@ -74,15 +75,29 @@ public class ConnectionUtil {
* Create the configured output Connection.
*
* @param conf configuration containing the connection information
+ * @return the configured output connection
+ */
+ public static Connection getOutputConnectionWithoutTheseProps(final Configuration conf, Set<String> ignoreTheseProps) throws SQLException {
+ return getOutputConnection(conf, new Properties(), ignoreTheseProps);
+ }
+
+ /**
+ * Create the configured output Connection.
+ *
+ * @param conf configuration containing the connection information
* @param props custom connection properties
* @return the configured output connection
*/
public static Connection getOutputConnection(final Configuration conf, Properties props) throws SQLException {
+ return getOutputConnection(conf, props, Collections.<String>emptySet());
+ }
+
+ public static Connection getOutputConnection(final Configuration conf, Properties props, Set<String> withoutTheseProps) throws SQLException {
Preconditions.checkNotNull(conf);
return getConnection(PhoenixConfigurationUtil.getOutputCluster(conf),
PhoenixConfigurationUtil.getClientPort(conf),
PhoenixConfigurationUtil.getZNodeParent(conf),
- PropertiesUtil.combineProperties(props, conf));
+ PropertiesUtil.combineProperties(props, conf, withoutTheseProps));
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c4ca690/phoenix-core/src/main/java/org/apache/phoenix/util/PropertiesUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PropertiesUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PropertiesUtil.java
index f6eb5c5..685b8cb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PropertiesUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PropertiesUtil.java
@@ -17,10 +17,13 @@
*/
package org.apache.phoenix.util;
+import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
+import java.util.Set;
+
import org.apache.hadoop.conf.Configuration;
public class PropertiesUtil {
@@ -50,13 +53,17 @@ public class PropertiesUtil {
* properties contained in conf
*/
public static Properties combineProperties(Properties props, final Configuration conf) {
+ return combineProperties(props, conf, Collections.<String>emptySet());
+ }
+
+ public static Properties combineProperties(Properties props, final Configuration conf, Set<String> withoutTheseProps) {
Iterator<Map.Entry<String, String>> iterator = conf.iterator();
Properties copy = deepCopy(props);
if (iterator != null) {
while (iterator.hasNext()) {
Map.Entry<String, String> entry = iterator.next();
// set the property from config only if props doesn't have it already
- if (copy.getProperty(entry.getKey()) == null) {
+ if (copy.getProperty(entry.getKey()) == null && !withoutTheseProps.contains(entry.getKey())) {
copy.setProperty(entry.getKey(), entry.getValue());
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c4ca690/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java
index 94ccc25..4de9854 100644
--- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java
+++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java
@@ -29,8 +29,10 @@ import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
import org.apache.phoenix.end2end.Shadower;
+import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.pig.ExecType;
@@ -62,6 +64,8 @@ public class BasePigIT extends BaseHBaseManagedTimeIT {
public void setUp() throws Exception {
conf = getTestClusterConfig();
conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+ // Set CURRENT_SCN to confirm that it's ignored
+ conf.set(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(System.currentTimeMillis()+QueryConstants.MILLIS_IN_DAY));
pigServer = new PigServer(ExecType.LOCAL, conf);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
conn = DriverManager.getConnection(getUrl(), props);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c4ca690/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
index a9f0c8f..e061c1c 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
@@ -21,6 +21,9 @@ import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Arrays;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -43,6 +46,7 @@ import org.apache.phoenix.pig.util.TableSchemaParserFunction;
import org.apache.phoenix.pig.util.TypeUtil;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.StoreFuncInterface;
@@ -87,14 +91,15 @@ import org.slf4j.LoggerFactory;
public class PhoenixHBaseStorage implements StoreFuncInterface {
private static final Logger LOG = LoggerFactory.getLogger(PhoenixHBaseStorage.class);
-
+ private static final Set<String> PROPS_TO_IGNORE = new HashSet<>(Arrays.asList(PhoenixRuntime.CURRENT_SCN_ATTRIB));
+
private Configuration config;
private RecordWriter<NullWritable, PhoenixRecordWritable> writer;
private List<ColumnInfo> columnInfo = null;
private String contextSignature = null;
private ResourceSchema schema;
private long batchSize;
- private final PhoenixOutputFormat outputFormat = new PhoenixOutputFormat();
+ private final PhoenixOutputFormat outputFormat = new PhoenixOutputFormat<PhoenixRecordWritable>(PROPS_TO_IGNORE);
// Set of options permitted
private final static Options validOptions = new Options();
private final static CommandLineParser parser = new GnuParser();
@@ -228,5 +233,4 @@ public class PhoenixHBaseStorage implements StoreFuncInterface {
schema = s;
getUDFProperties().setProperty(contextSignature + SCHEMA, ObjectSerializer.serialize(schema));
}
-
-}
\ No newline at end of file
+}