You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2016/11/04 22:13:38 UTC
[11/50] [abbrv] phoenix git commit: PHOENIX-6 Support ON DUPLICATE
KEY construct
PHOENIX-6 Support ON DUPLICATE KEY construct
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e2325a41
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e2325a41
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e2325a41
Branch: refs/heads/encodecolumns2
Commit: e2325a413d2b44f1432b30b7fd337643793cbd21
Parents: 613a5b7
Author: James Taylor <ja...@apache.org>
Authored: Thu Oct 27 11:20:20 2016 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Thu Oct 27 14:03:28 2016 -0700
----------------------------------------------------------------------
.../phoenix/end2end/OnDuplicateKeyIT.java | 523 +++++++++++++++++++
.../phoenix/end2end/index/IndexTestUtil.java | 6 +-
.../org/apache/phoenix/tx/TransactionIT.java | 15 +
phoenix-core/src/main/antlr3/PhoenixSQL.g | 24 +-
.../apache/phoenix/compile/DeleteCompiler.java | 6 +-
.../apache/phoenix/compile/UpsertCompiler.java | 104 +++-
.../UngroupedAggregateRegionObserver.java | 2 +-
.../phoenix/exception/SQLExceptionCode.java | 6 +
.../apache/phoenix/execute/MutationState.java | 32 +-
.../org/apache/phoenix/hbase/index/Indexer.java | 98 +++-
.../hbase/index/builder/BaseIndexBuilder.java | 14 +-
.../hbase/index/builder/IndexBuildManager.java | 10 +
.../hbase/index/builder/IndexBuilder.java | 29 +-
.../phoenix/hbase/index/covered/IndexCodec.java | 1 -
.../hbase/index/util/KeyValueBuilder.java | 15 +-
.../phoenix/index/PhoenixIndexBuilder.java | 318 +++++++++++
.../apache/phoenix/jdbc/PhoenixStatement.java | 11 +-
.../apache/phoenix/parse/ParseNodeFactory.java | 7 +-
.../apache/phoenix/parse/UpsertStatement.java | 10 +-
.../apache/phoenix/schema/DelegateColumn.java | 10 +
.../apache/phoenix/schema/DelegateTable.java | 18 +-
.../org/apache/phoenix/schema/PColumnImpl.java | 12 +-
.../java/org/apache/phoenix/schema/PRow.java | 11 +-
.../java/org/apache/phoenix/schema/PTable.java | 6 +-
.../org/apache/phoenix/schema/PTableImpl.java | 48 +-
.../org/apache/phoenix/util/ExpressionUtil.java | 1 -
.../phoenix/compile/QueryCompilerTest.java | 104 +++-
27 files changed, 1321 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
new file mode 100644
index 0000000..9a81026
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+
+@RunWith(Parameterized.class)
+public class OnDuplicateKeyIT extends ParallelStatsDisabledIT {
+ private final String indexDDL;
+
+ public OnDuplicateKeyIT(String indexDDL) {
+ this.indexDDL = indexDDL;
+ }
+
+ @Parameters
+ public static Collection<Object> data() {
+ List<Object> testCases = Lists.newArrayList();
+ testCases.add(new String[] {
+ "",
+ });
+ testCases.add(new String[] {
+ "create index %s_IDX on %s(counter1) include (counter2)",
+ });
+ testCases.add(new String[] {
+ "create index %s_IDX on %s(counter1, counter2)",
+ });
+ testCases.add(new String[] {
+ "create local index %s_IDX on %s(counter1) include (counter2)",
+ });
+ testCases.add(new String[] {
+ "create local index %s_IDX on %s(counter1, counter2)",
+ });
+ return testCases;
+ }
+
+ private void createIndex(Connection conn, String tableName) throws SQLException {
+ if (indexDDL == null || indexDDL.length() == 0) {
+ return;
+ }
+ String ddl = String.format(indexDDL, tableName, tableName);
+ conn.createStatement().execute(ddl);
+ }
+
+ @Test
+ public void testNewAndUpdateOnSingleNumericColumn() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 smallint)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+ String dml = "UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1";
+ conn.createStatement().execute(dml);
+ conn.commit();
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(0,rs.getLong(2));
+ assertFalse(rs.next());
+
+ conn.createStatement().execute(dml);
+ conn.commit();
+
+ rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(1,rs.getLong(2));
+ assertFalse(rs.next());
+
+ conn.close();
+ }
+
+ @Test
+ public void testNewAndUpdateOnSingleNumericColumnWithOtherColumns() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ String ddl = " create table " + tableName + "(k1 varchar, k2 varchar, counter1 varchar, counter2 date, other1 char(3), other2 varchar default 'f', constraint pk primary key (k1,k2))";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+ String dml = "UPSERT INTO " + tableName + " VALUES('a','b','c',null,'eee') " +
+ "ON DUPLICATE KEY UPDATE counter1 = counter1 || CASE WHEN LENGTH(counter1) < 10 THEN 'SMALL' ELSE 'LARGE' END || k2 || other2 || other1 ";
+ conn.createStatement().execute(dml);
+ conn.commit();
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals("b",rs.getString(2));
+ assertEquals("c",rs.getString(3));
+ assertEquals(null,rs.getDate(4));
+ assertEquals("eee",rs.getString(5));
+ assertEquals("f",rs.getString(6));
+ assertFalse(rs.next());
+
+ conn.createStatement().execute(dml);
+ conn.commit();
+
+ rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals("b",rs.getString(2));
+ assertEquals("cSMALLbfeee",rs.getString(3));
+ assertEquals(null,rs.getDate(4));
+ assertEquals("eee",rs.getString(5));
+ assertEquals("f",rs.getString(6));
+ assertFalse(rs.next());
+
+ conn.createStatement().execute(dml);
+ conn.commit();
+
+ rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals("b",rs.getString(2));
+ assertEquals("cSMALLbfeeeLARGEbfeee",rs.getString(3));
+ assertEquals(null,rs.getDate(4));
+ assertEquals("eee",rs.getString(5));
+ assertEquals("f",rs.getString(6));
+ assertFalse(rs.next());
+
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a','b','c',null,'eee') " +
+ "ON DUPLICATE KEY UPDATE counter1 = to_char(rand()), counter2 = current_date() + 1");
+ conn.commit();
+
+ rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals("b",rs.getString(2));
+ double d = Double.parseDouble(rs.getString(3));
+ assertTrue(d >= 0.0 && d <= 1.0);
+ Date date = rs.getDate(4);
+ assertTrue(date.after(new Date(System.currentTimeMillis())));
+ assertEquals("eee",rs.getString(5));
+ assertEquals("f",rs.getString(6));
+ assertFalse(rs.next());
+
+ conn.close();
+ }
+
+ @Test
+ public void testNewAndUpdateOnSingleVarcharColumn() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ String ddl = " create table " + tableName + "(pk varchar primary key, counter1 varchar, counter2 smallint)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+ String dml = "UPSERT INTO " + tableName + " VALUES('a','b') ON DUPLICATE KEY UPDATE counter1 = counter1 || 'b'";
+ conn.createStatement().execute(dml);
+ conn.commit();
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE substr(counter1,1,1) = 'b'");
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals("b",rs.getString(2));
+ assertFalse(rs.next());
+
+ conn.createStatement().execute(dml);
+ conn.commit();
+
+ rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE substr(counter1,1,1) = 'b'");
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals("bb",rs.getString(2));
+ assertFalse(rs.next());
+
+ conn.close();
+ }
+
+ @Test
+ public void testDeleteOnSingleVarcharColumnAutoCommit() throws Exception {
+ testDeleteOnSingleVarcharColumn(true);
+ }
+
+ @Test
+ public void testDeleteOnSingleVarcharColumnNoAutoCommit() throws Exception {
+ testDeleteOnSingleVarcharColumn(false);
+ }
+
+ private void testDeleteOnSingleVarcharColumn(boolean autoCommit) throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(autoCommit);
+ String tableName = generateUniqueName();
+ String ddl = " create table " + tableName + "(pk varchar primary key, counter1 varchar, counter2 smallint)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+ String dml = "UPSERT INTO " + tableName + " VALUES('a','b') ON DUPLICATE KEY UPDATE counter1 = null";
+ conn.createStatement().execute(dml);
+ conn.createStatement().execute(dml);
+ conn.commit();
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(null,rs.getString(2));
+ assertFalse(rs.next());
+
+ dml = "UPSERT INTO " + tableName + " VALUES('a','b',0)";
+ conn.createStatement().execute(dml);
+ dml = "UPSERT INTO " + tableName + " VALUES('a','b', 0) ON DUPLICATE KEY UPDATE counter1 = null, counter2 = counter2 + 1";
+ conn.createStatement().execute(dml);
+ dml = "UPSERT INTO " + tableName + " VALUES('a','b', 0) ON DUPLICATE KEY UPDATE counter1 = 'c', counter2 = counter2 + 1";
+ conn.createStatement().execute(dml);
+ conn.commit();
+
+ rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals("c",rs.getString(2));
+ assertEquals(2,rs.getInt(3));
+ assertFalse(rs.next());
+
+ conn.close();
+ }
+
+ @Test
+ public void testIgnoreOnSingleColumn() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 bigint)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',10)");
+ conn.commit();
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(10,rs.getLong(2));
+ assertFalse(rs.next());
+
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY IGNORE");
+ conn.commit();
+
+ rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(10,rs.getLong(2));
+ assertFalse(rs.next());
+
+ conn.close();
+ }
+
+ @Test
+ public void testInitialIgnoreWithUpdateOnSingleColumn() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 bigint)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+ // Test ignore combined with update in same commit batch for new record
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',10) ON DUPLICATE KEY IGNORE");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
+ conn.commit();
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(11,rs.getLong(2));
+ assertFalse(rs.next());
+
+ conn.close();
+ }
+
+ @Test
+ public void testOverrideOnDupKeyUpdateWithUpsert() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 bigint)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+ // Test upsert overriding ON DUPLICATE KEY entries
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',1) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',2) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',10)");
+ conn.commit();
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(10,rs.getLong(2));
+ assertFalse(rs.next());
+
+ conn.close();
+ }
+
+ @Test
+ public void testNewAndMultiUpdateOnSingleColumnAutoCommit() throws Exception {
+ testNewAndMultiUpdateOnSingleColumn(true);
+ }
+
+ @Test
+ public void testNewAndMultiUpdateOnSingleColumnNoAutoCommit() throws Exception {
+ testNewAndMultiUpdateOnSingleColumn(false);
+ }
+
+ private void testNewAndMultiUpdateOnSingleColumn(boolean autoCommit) throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(autoCommit);
+ String tableName = generateUniqueName();
+ String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 integer)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',5) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1"); // VALUES ignored
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY IGNORE"); // no impact
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',10) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1"); // VALUES ignored
+ conn.commit();
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(2,rs.getLong(2));
+ assertFalse(rs.next());
+
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 2");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 2");
+ conn.commit();
+
+ rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(9,rs.getLong(2));
+ assertFalse(rs.next());
+
+ conn.close();
+ }
+
+ @Test
+ public void testNewAndMultiDifferentUpdateOnSingleColumn() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 decimal)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+ String dml = "UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1";
+ conn.createStatement().execute(dml);
+ dml = "UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 2";
+ conn.createStatement().execute(dml);
+ dml = "UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1";
+ conn.createStatement().execute(dml);
+ conn.commit();
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(3,rs.getLong(2));
+ assertFalse(rs.next());
+
+ conn.close();
+ }
+
+ @Test
+ public void testNewAndMultiDifferentUpdateOnMultipleColumnsAutoCommit() throws Exception {
+ testNewAndMultiDifferentUpdateOnMultipleColumns(true);
+ }
+
+ @Test
+ public void testNewAndMultiDifferentUpdateOnMultipleColumnsNoAutoCommit() throws Exception {
+ testNewAndMultiDifferentUpdateOnMultipleColumns(false);
+ }
+
+ private void testNewAndMultiDifferentUpdateOnMultipleColumns(boolean autoCommit) throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(autoCommit);
+ String tableName = generateUniqueName();
+ String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 tinyint)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+ String dml = "UPSERT INTO " + tableName + " VALUES('a',0,0) ON DUPLICATE KEY UPDATE counter1 = counter2 + 1, counter2 = counter1 + 2";
+ conn.createStatement().execute(dml);
+ conn.createStatement().execute(dml);
+ conn.commit();
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(1,rs.getLong(2));
+ assertEquals(2,rs.getLong(3));
+ assertFalse(rs.next());
+
+ rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ * FROM " + tableName);
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(1,rs.getLong(2));
+ assertEquals(2,rs.getLong(3));
+ assertFalse(rs.next());
+
+ conn.createStatement().execute(dml);
+ conn.commit();
+
+ rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(3,rs.getLong(2));
+ assertEquals(3,rs.getLong(3));
+ assertFalse(rs.next());
+
+ rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ * FROM " + tableName);
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(3,rs.getLong(2));
+ assertEquals(3,rs.getLong(3));
+ assertFalse(rs.next());
+
+ conn.close();
+ }
+
+ @Test
+ public void testAtomicUpdate() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ final String tableName = generateUniqueName();
+ String ddl = " create table " + tableName + "(pk varchar primary key, counter1 integer, counter2 integer)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+ int nThreads = 10;
+ final int[] resultHolder = new int[1];
+ final int nCommits = 100;
+ final int nIncrementsPerCommit = 2;
+ ExecutorService exec = Executors.newFixedThreadPool(nThreads);
+ List<Future> futures = Lists.newArrayListWithExpectedSize(nThreads);
+ Connection[] connections = new Connection[nThreads];
+ for (int i = 0; i < nThreads; i++) {
+ connections[i] = DriverManager.getConnection(getUrl(), props);
+ }
+ for (int i = 0; i < nThreads; i++) {
+ final Connection myConn = connections[i];
+ futures.add(exec.submit(new Runnable() {
+ @Override
+ public void run() {
+ String dml = "UPSERT INTO " + tableName + " VALUES('a',1) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1";
+ try {
+ for (int j = 0; j < nCommits; j++) {
+ for (int k = 0; k < nIncrementsPerCommit; k++) {
+ myConn.createStatement().execute(dml);
+ resultHolder[0]++;
+ }
+ myConn.commit();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }));
+ }
+ Collections.shuffle(futures);
+ for (Future future : futures) {
+ future.get();
+ }
+ exec.shutdownNow();
+
+ int finalResult = nThreads * nCommits * nIncrementsPerCommit;
+ //assertEquals(finalResult,resultHolder[0]);
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(finalResult,rs.getInt(2));
+ assertFalse(rs.next());
+
+ rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ * FROM " + tableName + " WHERE counter1 >= 0");
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(finalResult,rs.getInt(2));
+ assertFalse(rs.next());
+
+ conn.close();
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java
index ba04ad7..e854f23 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java
@@ -43,11 +43,11 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnFamily;
-import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.PRow;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.RowKeySchema;
import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -125,7 +125,7 @@ public class IndexTestUtil {
long ts = MetaDataUtil.getClientTimeStamp(dataMutation);
if (dataMutation instanceof Delete && dataMutation.getFamilyCellMap().values().isEmpty()) {
indexTable.newKey(ptr, indexValues);
- row = indexTable.newRow(builder, ts, ptr);
+ row = indexTable.newRow(builder, ts, ptr, false);
row.delete();
} else {
// If no column families in table, then nothing to look for
@@ -153,7 +153,7 @@ public class IndexTestUtil {
}
}
indexTable.newKey(ptr, indexValues);
- row = indexTable.newRow(builder, ts, ptr);
+ row = indexTable.newRow(builder, ts, ptr, false);
int pos = 0;
while ((pos = indexValuesSet.nextSetBit(pos)) >= 0) {
int index = nIndexColumns + indexOffset + pos++;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index 2e45d5a..83128f1 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -698,4 +698,19 @@ public class TransactionIT extends ParallelStatsDisabledIT {
}
}
+
+
+ @Test
+ public void testOnDupKeyForTransactionalTable() throws Exception {
+ // TODO: we should support having a transactional table defined for a connectionless connection
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String transactTableName = generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + transactTableName + " (k integer not null primary key, v bigint) TRANSACTIONAL=true");
+ conn.createStatement().execute("UPSERT INTO " + transactTableName + " VALUES(0,0) ON DUPLICATE KEY UPDATE v = v + 1");
+ fail();
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL.getErrorCode(), e.getErrorCode());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index fa1e9db..1d1a873 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -135,6 +135,8 @@ tokens
EXECUTE = 'execute';
UPGRADE = 'upgrade';
DEFAULT = 'default';
+ DUPLICATE = 'duplicate';
+ IGNORE = 'ignore';
}
@@ -707,10 +709,26 @@ finally{ contextStack.pop(); }
upsert_node returns [UpsertStatement ret]
: UPSERT (hint=hintClause)? INTO t=from_table_name
(LPAREN p=upsert_column_refs RPAREN)?
- ((VALUES LPAREN v=one_or_more_expressions RPAREN) | s=select_node)
- {ret = factory.upsert(factory.namedTable(null,t,p == null ? null : p.getFirst()), hint, p == null ? null : p.getSecond(), v, s, getBindCount(), new HashMap<String, UDFParseNode>(udfParseNodes)); }
- ;
+ ((VALUES LPAREN v=one_or_more_expressions RPAREN ( ON DUPLICATE KEY ( ig=IGNORE | ( UPDATE pairs=update_column_pairs ) ) )? ) | s=select_node)
+ {ret = factory.upsert(
+ factory.namedTable(null,t,p == null ? null : p.getFirst()),
+ hint, p == null ? null : p.getSecond(),
+ v, s, getBindCount(),
+ new HashMap<String, UDFParseNode>(udfParseNodes),
+ ig != null ? Collections.<Pair<ColumnName,ParseNode>>emptyList() : pairs != null ? pairs : null); }
+ ;
+
+update_column_pairs returns [ List<Pair<ColumnName,ParseNode>> ret]
+@init{ret = new ArrayList<Pair<ColumnName,ParseNode>>(); }
+ : p=update_column_pair { ret.add(p); }
+ (COMMA p=update_column_pair { ret.add(p); } )*
+;
+
+update_column_pair returns [ Pair<ColumnName,ParseNode> ret ]
+ : c=column_name EQ e=expression { $ret = new Pair<ColumnName,ParseNode>(c,e); }
+;
+
upsert_column_refs returns [Pair<List<ColumnDef>,List<ColumnName>> ret]
@init{ret = new Pair<List<ColumnDef>,List<ColumnName>>(new ArrayList<ColumnDef>(), new ArrayList<ColumnName>()); }
: d=dyn_column_name_or_def { if (d.getDataType()!=null) { $ret.getFirst().add(d); } $ret.getSecond().add(d.getColumnDefName()); }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index e0881cf..602cd6b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -159,11 +159,11 @@ public class DeleteCompiler {
}
// When issuing deletes, we do not care about the row time ranges. Also, if the table had a row timestamp column, then the
// row key will already have its value.
- mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO));
+ mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
for (int i = 0; i < indexTableRefs.size(); i++) {
ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map
rs.getCurrentRow().getKey(indexPtr);
- indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO));
+ indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
}
if (mutations.size() > maxSize) {
throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize);
@@ -499,7 +499,7 @@ public class DeleteCompiler {
Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator();
Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
while (iterator.hasNext()) {
- mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO));
+ mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
}
return new MutationState(tableRef, mutation, 0, maxSize, connection);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 1caf7be..85517a1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -27,6 +27,7 @@ import java.sql.Timestamp;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
@@ -52,6 +54,7 @@ import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexBuilder;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -75,6 +78,7 @@ import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.ColumnRef;
import org.apache.phoenix.schema.ConstraintViolationException;
+import org.apache.phoenix.schema.DelegateColumn;
import org.apache.phoenix.schema.IllegalDataException;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.MetaDataEntityNotFoundException;
@@ -96,6 +100,7 @@ import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.types.PTimestamp;
import org.apache.phoenix.schema.types.PUnsignedLong;
+import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.MetaDataUtil;
@@ -107,10 +112,11 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
public class UpsertCompiler {
+
private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes,
PTable table, Map<ImmutableBytesPtr, RowMutationState> mutation,
PhoenixStatement statement, boolean useServerTimestamp, IndexMaintainer maintainer,
- byte[][] viewConstants) throws SQLException {
+ byte[][] viewConstants, byte[] onDupKeyBytes) throws SQLException {
Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
byte[][] pkValues = new byte[table.getPKColumns().size()][];
// If the table uses salting, the first byte is the salting byte, set to an empty array
@@ -154,7 +160,7 @@ public class UpsertCompiler {
ptr.set(ScanRanges.prefixKey(ptr.get(), 0, regionPrefix, regionPrefix.length));
}
}
- mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo));
+ mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes));
}
private static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector,
@@ -208,7 +214,7 @@ public class UpsertCompiler {
table.rowKeyOrderOptimizable());
values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
}
- setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants);
+ setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, null);
rowCount++;
// Commit a batch if auto commit is true and we're at our batch size
if (isAutoCommit && rowCount % batchSize == 0) {
@@ -869,6 +875,85 @@ public class UpsertCompiler {
constantExpressions.add(expression);
nodeIndex++;
}
+ byte[] onDupKeyBytesToBe = null;
+ List<Pair<ColumnName,ParseNode>> onDupKeyPairs = upsert.getOnDupKeyPairs();
+ if (onDupKeyPairs != null) {
+ if (table.isImmutableRows()) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_IMMUTABLE)
+ .setSchemaName(table.getSchemaName().getString())
+ .setTableName(table.getTableName().getString())
+ .build().buildException();
+ }
+ if (table.isTransactional()) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL)
+ .setSchemaName(table.getSchemaName().getString())
+ .setTableName(table.getTableName().getString())
+ .build().buildException();
+ }
+ if (connection.getSCN() != null) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_SCN_IN_ON_DUP_KEY)
+ .setSchemaName(table.getSchemaName().getString())
+ .setTableName(table.getTableName().getString())
+ .build().buildException();
+ }
+ if (onDupKeyPairs.isEmpty()) { // ON DUPLICATE KEY IGNORE
+ onDupKeyBytesToBe = PhoenixIndexBuilder.serializeOnDupKeyIgnore();
+ } else { // ON DUPLICATE KEY UPDATE
+ int position = 1;
+ UpdateColumnCompiler compiler = new UpdateColumnCompiler(context);
+ int nColumns = onDupKeyPairs.size();
+ List<Expression> updateExpressions = Lists.newArrayListWithExpectedSize(nColumns);
+ LinkedHashSet<PColumn>updateColumns = Sets.newLinkedHashSetWithExpectedSize(nColumns + 1);
+ updateColumns.add(new PColumnImpl(
+ table.getPKColumns().get(0).getName(), // Use first PK column name as we know it won't conflict with others
+ null, PVarbinary.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null, false, null, false, false));
+ for (Pair<ColumnName,ParseNode> columnPair : onDupKeyPairs) {
+ ColumnName colName = columnPair.getFirst();
+ PColumn updateColumn = resolver.resolveColumn(null, colName.getFamilyName(), colName.getColumnName()).getColumn();
+ if (SchemaUtil.isPKColumn(updateColumn)) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_UPDATE_PK_ON_DUP_KEY)
+ .setSchemaName(table.getSchemaName().getString())
+ .setTableName(table.getTableName().getString())
+ .setColumnName(updateColumn.getName().getString())
+ .build().buildException();
+ }
+ final int columnPosition = position++;
+ if (!updateColumns.add(new DelegateColumn(updateColumn) {
+ @Override
+ public int getPosition() {
+ return columnPosition;
+ }
+ })) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.DUPLICATE_COLUMN_IN_ON_DUP_KEY)
+ .setSchemaName(table.getSchemaName().getString())
+ .setTableName(table.getTableName().getString())
+ .setColumnName(updateColumn.getName().getString())
+ .build().buildException();
+ };
+ ParseNode updateNode = columnPair.getSecond();
+ compiler.setColumn(updateColumn);
+ Expression updateExpression = updateNode.accept(compiler);
+ // Check that updateExpression is coercible to updateColumn
+ if (updateExpression.getDataType() != null && !updateExpression.getDataType().isCastableTo(updateColumn.getDataType())) {
+ throw TypeMismatchException.newException(
+ updateExpression.getDataType(), updateColumn.getDataType(), "expression: "
+ + updateExpression.toString() + " for column " + updateColumn);
+ }
+ if (compiler.isAggregate()) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.AGGREGATION_NOT_ALLOWED_IN_ON_DUP_KEY)
+ .setSchemaName(table.getSchemaName().getString())
+ .setTableName(table.getTableName().getString())
+ .setColumnName(updateColumn.getName().getString())
+ .build().buildException();
+ }
+ updateExpressions.add(updateExpression);
+ }
+ PTable onDupKeyTable = PTableImpl.makePTable(table, updateColumns);
+ onDupKeyBytesToBe = PhoenixIndexBuilder.serializeOnDupKeyUpdate(onDupKeyTable, updateExpressions);
+ }
+ }
+ final byte[] onDupKeyBytes = onDupKeyBytesToBe;
+
return new MutationPlan() {
@Override
public ParameterMetaData getParameterMetaData() {
@@ -958,7 +1043,7 @@ public class UpsertCompiler {
indexMaintainer = table.getIndexMaintainer(parentTable, connection);
viewConstants = IndexUtil.getViewConstants(parentTable);
}
- setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants);
+ setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes);
return new MutationState(tableRef, mutation, 0, maxSize, connection);
}
@@ -1004,10 +1089,10 @@ public class UpsertCompiler {
return upsertRef;
}
- private static final class UpsertValuesCompiler extends ExpressionCompiler {
+ private static class UpdateColumnCompiler extends ExpressionCompiler {
private PColumn column;
- private UpsertValuesCompiler(StatementContext context) {
+ private UpdateColumnCompiler(StatementContext context) {
super(context);
}
@@ -1032,7 +1117,12 @@ public class UpsertCompiler {
}
return super.visit(node);
}
-
+ }
+
+ private static class UpsertValuesCompiler extends UpdateColumnCompiler {
+ private UpsertValuesCompiler(StatementContext context) {
+ super(context);
+ }
@Override
public Expression visit(SequenceValueParseNode node) throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index f09a20f..9fd59ae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -535,7 +535,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
projectedTable.newKey(ptr, values);
- PRow row = projectedTable.newRow(kvBuilder, ts, ptr);
+ PRow row = projectedTable.newRow(kvBuilder, ts, ptr, false);
for (; i < projectedColumns.size(); i++) {
Expression expression = selectExpressions.get(i);
if (expression.evaluate(result, ptr)) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 2346224..ac5619f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -326,6 +326,12 @@ public enum SQLExceptionCode {
return new SequenceNotFoundException(info.getSchemaName(), info.getTableName());
}
}),
+ CANNOT_UPDATE_PK_ON_DUP_KEY(1218, "42Z18", "Primary key columns may not be udpated in ON DUPLICATE KEY UPDATE clause." ),
+ CANNOT_USE_ON_DUP_KEY_FOR_IMMUTABLE(1219, "42Z19", "The ON DUPLICATE KEY UPDATE clause may not be used for immutable tables." ),
+ CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL(1220, "42Z20", "The ON DUPLICATE KEY UPDATE clause may not be used for transactional tables." ),
+ DUPLICATE_COLUMN_IN_ON_DUP_KEY(1221, "42Z21", "Duplicate column in ON DUPLICATE KEY UPDATE." ),
+ AGGREGATION_NOT_ALLOWED_IN_ON_DUP_KEY(1222, "42Z22", "Aggregation in ON DUPLICATE KEY UPDATE is not allowed." ),
+ CANNOT_SET_SCN_IN_ON_DUP_KEY(1223, "42Z23", "The CURRENT_SCN may not be set for statement using ON DUPLICATE KEY." ),
/** Parser error. (errorcode 06, sqlState 42P) */
PARSER_ERROR(601, "42P00", "Syntax error.", Factory.SYNTAX_ERROR),
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 262f263..9d1344b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -55,6 +55,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.IndexMetaDataCacheClient;
+import org.apache.phoenix.index.PhoenixIndexBuilder;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
@@ -620,6 +621,8 @@ public class MutationState implements SQLCloseable {
long timestampToUse = timestamp;
while (iterator.hasNext()) {
Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry = iterator.next();
+ byte[] onDupKeyBytes = rowEntry.getValue().getOnDupKeyBytes();
+ boolean hasOnDupKey = onDupKeyBytes != null;
ImmutableBytesPtr key = rowEntry.getKey();
RowMutationState state = rowEntry.getValue();
if (tableWithRowTimestampCol) {
@@ -635,7 +638,7 @@ public class MutationState implements SQLCloseable {
}
PRow row =
tableRef.getTable()
- .newRow(connection.getKeyValueBuilder(), timestampToUse, key);
+ .newRow(connection.getKeyValueBuilder(), timestampToUse, key, hasOnDupKey);
List<Mutation> rowMutations, rowMutationsPertainingToIndex;
if (rowEntry.getValue().getColumnValues() == PRow.DELETE_MARKER) { // means delete
row.delete();
@@ -650,6 +653,15 @@ public class MutationState implements SQLCloseable {
row.setValue(valueEntry.getKey(), valueEntry.getValue());
}
rowMutations = row.toRowMutations();
+ // Pass through ON DUPLICATE KEY info through mutations
+ // In the case of the same clause being used on many statements, this will be
+ // inefficient because we're transmitting the same information for each mutation.
+ // TODO: use our ServerCache
+ for (Mutation mutation : rowMutations) {
+ if (onDupKeyBytes != null) {
+ mutation.setAttribute(PhoenixIndexBuilder.ATOMIC_OP_ATTRIB, onDupKeyBytes);
+ }
+ }
rowMutationsPertainingToIndex = rowMutations;
}
mutationList.addAll(rowMutations);
@@ -1452,15 +1464,22 @@ public class MutationState implements SQLCloseable {
@Nonnull private Map<PColumn,byte[]> columnValues;
private int[] statementIndexes;
@Nonnull private final RowTimestampColInfo rowTsColInfo;
+ private byte[] onDupKeyBytes;
- public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo) {
+ public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo,
+ byte[] onDupKeyBytes) {
checkNotNull(columnValues);
checkNotNull(rowTsColInfo);
this.columnValues = columnValues;
this.statementIndexes = new int[] {statementIndex};
this.rowTsColInfo = rowTsColInfo;
+ this.onDupKeyBytes = onDupKeyBytes;
}
+ byte[] getOnDupKeyBytes() {
+ return onDupKeyBytes;
+ }
+
Map<PColumn, byte[]> getColumnValues() {
return columnValues;
}
@@ -1470,7 +1489,14 @@ public class MutationState implements SQLCloseable {
}
void join(RowMutationState newRow) {
- getColumnValues().putAll(newRow.getColumnValues());
+ // If we already have a row and the new row has an ON DUPLICATE KEY clause
+ // ignore the new values (as that's what the server will do).
+ if (newRow.onDupKeyBytes == null) {
+ getColumnValues().putAll(newRow.getColumnValues());
+ }
+ // Concatenate ON DUPLICATE KEY bytes to allow multiple
+ // increments of the same row in the same commit batch.
+ this.onDupKeyBytes = PhoenixIndexBuilder.combineOnDupKey(this.onDupKeyBytes, newRow.onDupKeyBytes);
statementIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes());
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index eb5d3a8..84c8d7d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -32,23 +33,28 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.UserGroupInformation;
@@ -61,15 +67,16 @@ import org.apache.phoenix.hbase.index.util.VersionUtil;
import org.apache.phoenix.hbase.index.wal.IndexedKeyValue;
import org.apache.phoenix.hbase.index.write.IndexFailurePolicy;
import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter;
import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.util.ServerUtil;
import org.cloudera.htrace.Span;
import org.cloudera.htrace.Trace;
import org.cloudera.htrace.TraceScope;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter;
+
import com.google.common.collect.Multimap;
/**
@@ -189,6 +196,45 @@ public class Indexer extends BaseRegionObserver {
this.recoveryWriter.stop(msg);
}
+ /**
+ * We use an Increment to serialize the ON DUPLICATE KEY clause so that the HBase plumbing
+ * sets up the necessary locks and mvcc to allow an atomic update. The Increment is not a
+ * real increment, though, it's really more of a Put. We translate the Increment into a
+ * list of mutations, at most a single Put and Delete that are the changes upon executing
+ * the list of ON DUPLICATE KEY clauses for this row.
+ */
+ @Override
+ public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> e,
+ final Increment inc) throws IOException {
+ try {
+ List<Mutation> mutations = this.builder.executeAtomicOp(inc);
+ if (mutations == null) {
+ return null;
+ }
+
+ // Causes the Increment to be ignored as we're committing the mutations
+ // ourselves below.
+ e.bypass();
+ e.complete();
+ // ON DUPLICATE KEY IGNORE will return empty list if row already exists
+ // as no action is required in that case.
+ if (!mutations.isEmpty()) {
+ HRegion region = e.getEnvironment().getRegion();
+ // Otherwise, submit the mutations directly here
+ region.mutateRowsWithLocks(
+ mutations,
+ Collections.<byte[]>emptyList(), // Rows are already locked
+ HConstants.NO_NONCE, HConstants.NO_NONCE);
+ }
+ return Result.EMPTY_RESULT;
+ } catch (Throwable t) {
+ throw ServerUtil.createIOException(
+ "Unable to process ON DUPLICATE IGNORE for " +
+ e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString() +
+ "(" + Bytes.toStringBinary(inc.getRow()) + ")", t);
+ }
+ }
+
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
@@ -206,13 +252,15 @@ public class Indexer extends BaseRegionObserver {
"Somehow didn't return an index update but also didn't propagate the failure to the client!");
}
+ private static final OperationStatus SUCCESS = new OperationStatus(OperationStatusCode.SUCCESS);
+
public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
// first group all the updates for a single row into a single update to be processed
Map<ImmutableBytesPtr, MultiMutation> mutations =
new HashMap<ImmutableBytesPtr, MultiMutation>();
-
+
Durability defaultDurability = Durability.SYNC_WAL;
if(c.getEnvironment().getRegion() != null) {
defaultDurability = c.getEnvironment().getRegion().getTableDesc().getDurability();
@@ -222,33 +270,35 @@ public class Indexer extends BaseRegionObserver {
Durability durability = Durability.SKIP_WAL;
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation m = miniBatchOp.getOperation(i);
+ if (this.builder.isAtomicOp(m)) {
+ miniBatchOp.setOperationStatus(i, SUCCESS);
+ continue;
+ }
// skip this mutation if we aren't enabling indexing
// unfortunately, we really should ask if the raw mutation (rather than the combined mutation)
// should be indexed, which means we need to expose another method on the builder. Such is the
// way optimization go though.
- if (!this.builder.isEnabled(m)) {
- continue;
- }
-
- Durability effectiveDurablity = (m.getDurability() == Durability.USE_DEFAULT) ?
- defaultDurability : m.getDurability();
- if (effectiveDurablity.ordinal() > durability.ordinal()) {
- durability = effectiveDurablity;
- }
-
- // add the mutation to the batch set
- ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
- MultiMutation stored = mutations.get(row);
- // we haven't seen this row before, so add it
- if (stored == null) {
- stored = new MultiMutation(row);
- mutations.put(row, stored);
+ if (this.builder.isEnabled(m)) {
+ Durability effectiveDurablity = (m.getDurability() == Durability.USE_DEFAULT) ?
+ defaultDurability : m.getDurability();
+ if (effectiveDurablity.ordinal() > durability.ordinal()) {
+ durability = effectiveDurablity;
+ }
+
+ // add the mutation to the batch set
+ ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+ MultiMutation stored = mutations.get(row);
+ // we haven't seen this row before, so add it
+ if (stored == null) {
+ stored = new MultiMutation(row);
+ mutations.put(row, stored);
+ }
+ stored.addAll(m);
}
- stored.addAll(m);
}
// early exit if it turns out we don't have any edits
- if (mutations.entrySet().size() == 0) {
+ if (mutations.isEmpty()) {
return;
}
@@ -360,7 +410,7 @@ public class Indexer extends BaseRegionObserver {
private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability, boolean allowLocalUpdates)
throws Exception {
//short circuit, if we don't need to do any work
- if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m)) {
+ if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m) || edit == null) {
// already did the index update in prePut, so we are done
return;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index 4e329e9..b9174b8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -12,17 +12,19 @@ package org.apache.phoenix.hbase.index.builder;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.Collection;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.covered.IndexCodec;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
/**
@@ -91,6 +93,16 @@ public abstract class BaseIndexBuilder implements IndexBuilder {
return this.codec.isEnabled(m);
}
+ @Override
+ public boolean isAtomicOp(Mutation m) throws IOException {
+ return false;
+ }
+
+ @Override
+ public List<Mutation> executeAtomicOp(Increment inc) throws IOException {
+ return null;
+ }
+
/**
* Exposed for testing!
*
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index f411b8e..325904d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
@@ -178,6 +179,14 @@ public class IndexBuildManager implements Stoppable {
return delegate.isEnabled(m);
}
+ public boolean isAtomicOp(Mutation m) throws IOException {
+ return delegate.isAtomicOp(m);
+ }
+
+ public List<Mutation> executeAtomicOp(Increment inc) throws IOException {
+ return delegate.executeAtomicOp(inc);
+ }
+
@Override
public void stop(String why) {
if (stopped) {
@@ -196,4 +205,5 @@ public class IndexBuildManager implements Stoppable {
public IndexBuilder getBuilderForTesting() {
return this.delegate;
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index 36aba77..dff205a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -19,11 +19,13 @@ package org.apache.phoenix.hbase.index.builder;
import java.io.IOException;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -65,19 +67,10 @@ public interface IndexBuilder extends Stoppable {
* Implementers must ensure that this method is thread-safe - it could (and probably will) be
* called concurrently for different mutations, which may or may not be part of the same batch.
* @param mutation update to the primary table to be indexed.
- * @param context TODO
+ * @param context index meta data for the mutation
* @return a Map of the mutations to make -> target index table name
* @throws IOException on failure
*/
- /* TODO:
- Create BaseIndexBuilder with everything except getIndexUpdate().
- Derive two concrete classes: NonTxIndexBuilder and TxIndexBuilder.
- NonTxIndexBuilder will be current impl of this method.
- TxIndexBuilder will use a scan with skipScan over TxAwareHBase to find the latest values.
- Conditionally don't do WALEdit stuff for txnal table (ensure Phoenix/HBase tolerates index WAl edit info not being there)
- Noop Failure mode
- */
-
public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context) throws IOException;
/**
@@ -139,4 +132,20 @@ public interface IndexBuilder extends Stoppable {
* @throws IOException
*/
public boolean isEnabled(Mutation m) throws IOException;
+
+ /**
+ * True if mutation has an ON DUPLICATE KEY clause
+ * @param m mutation
+ * @return true if mutation has ON DUPLICATE KEY expression and false otherwise.
+ * @throws IOException
+ */
+ public boolean isAtomicOp(Mutation m) throws IOException;
+
+ /**
+ * Calculate the mutations based on the ON DUPLICATE KEY clause
+ * @param inc increment to run against
+ * @return list of mutations as a result of executing the ON DUPLICATE KEY clause
+ * or null if Increment does not represent an ON DUPLICATE KEY clause.
+ */
+ public List<Mutation> executeAtomicOp(Increment inc) throws IOException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
index 93de11e..e6d683e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
@@ -23,7 +23,6 @@ import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
* added to the codec, as well as potentially not haivng to implement some methods.
*/
public interface IndexCodec {
-
/**
* Do any code initialization necessary
*
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java
index e3bd7a8..741bf87 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java
@@ -18,9 +18,11 @@
package org.apache.phoenix.hbase.index.util;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
@@ -40,13 +42,14 @@ public abstract class KeyValueBuilder {
* @throws RuntimeException if there is an IOException thrown from the underlying {@link Put}
*/
@SuppressWarnings("javadoc")
- public static void addQuietly(Put put, KeyValueBuilder builder, KeyValue kv) {
- try {
- put.add(kv);
- } catch (IOException e) {
- throw new RuntimeException("KeyValue Builder " + builder + " created an invalid kv: "
- + kv + "!");
+ public static void addQuietly(Mutation m, KeyValueBuilder builder, KeyValue kv) {
+ byte [] family = CellUtil.cloneFamily(kv);
+ List<Cell> list = m.getFamilyCellMap().get(family);
+ if (list == null) {
+ list = new ArrayList<Cell>();
+ m.getFamilyCellMap().put(family, list);
}
+ list.add(kv);
}
/**