You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by jy...@apache.org on 2016/02/12 21:17:43 UTC
phoenix git commit: PHOENIX-2674 PhoenixMapReduceUtil#setInput
doesn't honor condition clause
Repository: phoenix
Updated Branches:
refs/heads/master 0c1fd3ad5 -> 8ece81b55
PHOENIX-2674 PhoenixMapReduceUtil#setInput doesn't honor condition clause
Setting the condition in the PhoenixMapReduceUtil,
as well as some slight cleanup for duplicate code
in setInput(). Adding a test that covers mapreduce
with and without a condition.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8ece81b5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8ece81b5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8ece81b5
Branch: refs/heads/master
Commit: 8ece81b5522df3e6bd9dfdb3112e101215bb49f1
Parents: 0c1fd3a
Author: Jesse Yates <jy...@apache.org>
Authored: Wed Feb 10 12:46:47 2016 -0800
Committer: Jesse Yates <jy...@apache.org>
Committed: Fri Feb 12 12:15:42 2016 -0800
----------------------------------------------------------------------
.../org/apache/phoenix/end2end/MapReduceIT.java | 230 +++++++++++++++++++
.../mapreduce/util/PhoenixMapReduceUtil.java | 65 +++---
2 files changed, 264 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece81b5/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
new file mode 100644
index 0000000..f030701
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.schema.types.PDouble;
+import org.apache.phoenix.schema.types.PhoenixArray;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.*;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test that our MapReduce basic tools work as expected
+ */
+public class MapReduceIT extends BaseHBaseManagedTimeIT {
+
+ private static final String STOCK_TABLE_NAME = "stock";
+ private static final String STOCK_STATS_TABLE_NAME = "stock_stats";
+ private static final String STOCK_NAME = "STOCK_NAME";
+ private static final String RECORDING_YEAR = "RECORDING_YEAR";
+ private static final String RECORDINGS_QUARTER = "RECORDINGS_QUARTER";
+ private static final String CREATE_STOCK_TABLE = "CREATE TABLE IF NOT EXISTS " + STOCK_TABLE_NAME + " ( " +
+ STOCK_NAME + " VARCHAR NOT NULL ," + RECORDING_YEAR + " INTEGER NOT NULL, " + RECORDINGS_QUARTER +
+ " DOUBLE array[] CONSTRAINT pk PRIMARY KEY (" + STOCK_NAME + " , " + RECORDING_YEAR + "))";
+
+ private static final String MAX_RECORDING = "MAX_RECORDING";
+ private static final String CREATE_STOCK_STATS_TABLE =
+ "CREATE TABLE IF NOT EXISTS " + STOCK_STATS_TABLE_NAME + "(" + STOCK_NAME + " VARCHAR NOT NULL , "
+ + MAX_RECORDING + " DOUBLE CONSTRAINT pk PRIMARY KEY (" + STOCK_NAME + "))";
+ private static final String UPSERT = "UPSERT into " + STOCK_TABLE_NAME + " values (?, ?, ?)";
+
+ @Before
+ public void setupTables() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute(CREATE_STOCK_TABLE);
+ conn.createStatement().execute(CREATE_STOCK_STATS_TABLE);
+ conn.commit();
+ }
+
+ @Test
+ public void testNoConditionsOnSelect() throws Exception {
+ final Configuration conf = getUtility().getConfiguration();
+ Job job = Job.getInstance(conf);
+ PhoenixMapReduceUtil.setInput(job, StockWritable.class, STOCK_TABLE_NAME, null,
+ STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
+ testJob(job, 91.04);
+ }
+
+ @Test
+ public void testConditionsOnSelect() throws Exception {
+ final Configuration conf = getUtility().getConfiguration();
+ Job job = Job.getInstance(conf);
+ PhoenixMapReduceUtil.setInput(job, StockWritable.class, STOCK_TABLE_NAME, RECORDING_YEAR+" < 2009",
+ STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
+ testJob(job, 81.04);
+ }
+
+ private void testJob(Job job, double expectedMax)
+ throws SQLException, InterruptedException, IOException, ClassNotFoundException {
+ upsertData();
+
+ // only run locally, rather than having to spin up a MiniMapReduce cluster and lets us use breakpoints
+ job.getConfiguration().set("mapreduce.framework.name", "local");
+ setOutput(job);
+
+ job.setMapperClass(StockMapper.class);
+ job.setReducerClass(StockReducer.class);
+ job.setOutputFormatClass(PhoenixOutputFormat.class);
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(DoubleWritable.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(StockWritable.class);
+
+ // run job
+ assertTrue("Job didn't complete successfully! Check logs for reason.", job.waitForCompletion(true));
+
+ // verify
+ ResultSet stats = DriverManager.getConnection(getUrl()).createStatement()
+ .executeQuery("SELECT * FROM " + STOCK_STATS_TABLE_NAME);
+ assertTrue("No data stored in stats table!", stats.next());
+ String name = stats.getString(1);
+ double max = stats.getDouble(2);
+ assertEquals("Got the wrong stock name!", "AAPL", name);
+ assertEquals("Max value didn't match the expected!", expectedMax, max, 0);
+ assertFalse("Should only have stored one row in stats table!", stats.next());
+ }
+
+ /**
+ * Custom output setting because output upsert statement setting is broken (PHOENIX-2677)
+ *
+ * @param job to update
+ */
+ private void setOutput(Job job) {
+ final Configuration configuration = job.getConfiguration();
+ PhoenixConfigurationUtil.setOutputTableName(configuration, STOCK_STATS_TABLE_NAME);
+ configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, "UPSERT into " + STOCK_STATS_TABLE_NAME +
+ " (" + STOCK_NAME + ", " + MAX_RECORDING + ") values (?,?)");
+ job.setOutputFormatClass(PhoenixOutputFormat.class);
+ }
+
+ private void upsertData() throws SQLException {
+ Connection conn = DriverManager.getConnection(getUrl());
+ PreparedStatement stmt = conn.prepareStatement(UPSERT);
+ upsertData(stmt, "AAPL", 2009, new Double[]{85.88, 91.04, 88.5, 90.3});
+ upsertData(stmt, "AAPL", 2008, new Double[]{75.88, 81.04, 78.5, 80.3});
+ conn.commit();
+ }
+
+ private void upsertData(PreparedStatement stmt, String name, int year, Double[] data) throws SQLException {
+ int i = 1;
+ stmt.setString(i++, name);
+ stmt.setInt(i++, year);
+ Array recordings = new PhoenixArray.PrimitiveDoublePhoenixArray(PDouble.INSTANCE, data);
+ stmt.setArray(i++, recordings);
+ stmt.execute();
+ }
+
+ public static class StockWritable implements DBWritable {
+
+ private String stockName;
+ private double[] recordings;
+ private double maxPrice;
+
+ @Override
+ public void readFields(ResultSet rs) throws SQLException {
+ stockName = rs.getString(STOCK_NAME);
+ final Array recordingsArray = rs.getArray(RECORDINGS_QUARTER);
+ recordings = (double[]) recordingsArray.getArray();
+ }
+
+ @Override
+ public void write(PreparedStatement pstmt) throws SQLException {
+ pstmt.setString(1, stockName);
+ pstmt.setDouble(2, maxPrice);
+ }
+
+ public double[] getRecordings() {
+ return recordings;
+ }
+
+ public String getStockName() {
+ return stockName;
+ }
+
+ public void setStockName(String stockName) {
+ this.stockName = stockName;
+ }
+
+ public void setMaxPrice(double maxPrice) {
+ this.maxPrice = maxPrice;
+ }
+ }
+
+ /**
+ * Extract the max price for each stock recording
+ */
+ public static class StockMapper extends Mapper<NullWritable, StockWritable, Text, DoubleWritable> {
+
+ private Text stock = new Text();
+ private DoubleWritable price = new DoubleWritable();
+
+ @Override
+ protected void map(NullWritable key, StockWritable stockWritable, Context context)
+ throws IOException, InterruptedException {
+ double[] recordings = stockWritable.getRecordings();
+ final String stockName = stockWritable.getStockName();
+ double maxPrice = Double.MIN_VALUE;
+ for (double recording : recordings) {
+ if (maxPrice < recording) {
+ maxPrice = recording;
+ }
+ }
+ stock.set(stockName);
+ price.set(maxPrice);
+ context.write(stock, price);
+ }
+ }
+
+ /**
+ * Store the max price seen for each stock
+ */
+ public static class StockReducer extends Reducer<Text, DoubleWritable, NullWritable, StockWritable> {
+
+ @Override
+ protected void reduce(Text key, Iterable<DoubleWritable> recordings, Context context)
+ throws IOException, InterruptedException {
+ double maxPrice = Double.MIN_VALUE;
+ for (DoubleWritable recording : recordings) {
+ if (maxPrice < recording.get()) {
+ maxPrice = recording.get();
+ }
+ }
+ final StockWritable stock = new StockWritable();
+ stock.setStockName(key.toString());
+ stock.setMaxPrice(maxPrice);
+ context.write(NullWritable.get(), stock);
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece81b5/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
index f52c860..125c6a8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -30,43 +30,46 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
public final class PhoenixMapReduceUtil {
private PhoenixMapReduceUtil() {
-
+
}
-
+
/**
- *
+ *
* @param job
* @param inputClass DBWritable class
* @param tableName Input table name
- * @param conditions Condition clause to be added to the WHERE clause.
+ * @param conditions Condition clause to be added to the WHERE clause. Can be <tt>null</tt> if there are no conditions.
* @param fieldNames fields being projected for the SELECT query.
*/
- public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName , final String conditions, final String... fieldNames) {
- job.setInputFormatClass(PhoenixInputFormat.class);
- final Configuration configuration = job.getConfiguration();
- PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
- PhoenixConfigurationUtil.setSelectColumnNames(configuration,fieldNames);
- PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
- PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.TABLE);
+ public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName,
+ final String conditions, final String... fieldNames) {
+ final Configuration configuration = setInput(job, inputClass, tableName);
+ if(conditions != null) {
+ PhoenixConfigurationUtil.setInputTableConditions(configuration, conditions);
+ }
+ PhoenixConfigurationUtil.setSelectColumnNames(configuration, fieldNames);
}
-
+
/**
- *
- * @param job
- * @param inputClass DBWritable class
+ *
+ * @param job
+ * @param inputClass DBWritable class
* @param tableName Input table name
* @param inputQuery Select query.
*/
public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName, final String inputQuery) {
- job.setInputFormatClass(PhoenixInputFormat.class);
- final Configuration configuration = job.getConfiguration();
- PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
- PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery);
- PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
+ final Configuration configuration = setInput(job, inputClass, tableName);
PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
-
}
-
+
+ private static Configuration setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName){
+ job.setInputFormatClass(PhoenixInputFormat.class);
+ final Configuration configuration = job.getConfiguration();
+ PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+ PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
+ return configuration;
+ }
+
/**
* A method to override which HBase cluster for {@link PhoenixInputFormat} to read from
* @param job MapReduce Job
@@ -77,10 +80,10 @@ public final class PhoenixMapReduceUtil {
PhoenixConfigurationUtil.setInputCluster(configuration, quorum);
}
/**
- *
+ *
* @param job
- * @param outputClass
- * @param tableName Output table
+ * @param outputClass
+ * @param tableName Output table
* @param columns List of columns separated by ,
*/
public static void setOutput(final Job job, final String tableName,final String columns) {
@@ -89,13 +92,13 @@ public final class PhoenixMapReduceUtil {
PhoenixConfigurationUtil.setOutputTableName(configuration, tableName);
PhoenixConfigurationUtil.setUpsertColumnNames(configuration,columns.split(","));
}
-
-
+
+
/**
- *
+ *
* @param job
* @param outputClass
- * @param tableName Output table
+ * @param tableName Output table
* @param fieldNames fields
*/
public static void setOutput(final Job job, final String tableName , final String... fieldNames) {
@@ -104,7 +107,7 @@ public final class PhoenixMapReduceUtil {
PhoenixConfigurationUtil.setOutputTableName(configuration, tableName);
PhoenixConfigurationUtil.setUpsertColumnNames(configuration,fieldNames);
}
-
+
/**
* A method to override which HBase cluster for {@link PhoenixOutputFormat} to write to
* @param job MapReduce Job
@@ -115,5 +118,5 @@ public final class PhoenixMapReduceUtil {
PhoenixConfigurationUtil.setOutputCluster(configuration, quorum);
}
-
+
}