You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@phoenix.apache.org by "Jan Fernando (JIRA)" <ji...@apache.org> on 2014/07/17 02:34:05 UTC

[jira] [Commented] (PHOENIX-1096) Duplicate sequence values returned when doing upsert select against a salted table.

    [ https://issues.apache.org/jira/browse/PHOENIX-1096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14064401#comment-14064401 ] 

Jan Fernando commented on PHOENIX-1096:
---------------------------------------

I think I have found the root cause, but am investigating further. The issue appears to be the fact that SequenceValueExpression is cached and the byte[] valueBuffer we write the Sequence value to as loop of rows from the select part of Upsert...Select is a member field and not thread safe. What I observed is different threads writing to this concurrently causing multiple threads to see the same sequence number. Moving the byte[] valueBuffer to a local var results in the correct number of mutations being generated and upserted into the target table.

Here's the initial change I made that appears to address this. But I am running into some other ordering issues, so don't want to declare victory yet. But here is the change I have so far for reference - this is not an official path request - I wanted to capture my findings to date.

--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
@@ -174,7 +174,7 @@ public class SequenceManager {

     private class SequenceValueExpression extends BaseTerminalExpression {
         private final int index;
-        private final byte[] valueBuffer = new byte[PDataType.LONG.getByteSize()];


         private SequenceValueExpression(int index) {
             this.index = index;
@@ -186,6 +186,7 @@ public class SequenceManager {

         @Override
         public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+               byte[] valueBuffer = new byte[PDataType.LONG.getByteSize()];
             PDataType.LONG.getCodec().encodeLong(tuple.getSequenceValue(index), valueBuffer, 0);
             ptr.set(valueBuffer);
             return true;

> Duplicate sequence values returned when doing upsert select against a salted table.
> -----------------------------------------------------------------------------------
>
>                 Key: PHOENIX-1096
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-1096
>             Project: Phoenix
>          Issue Type: Bug
>            Reporter: Samarth Jain
>             Fix For: 3.0.0, 4.0.0, 5.0.0
>
>
> {code}
> @Test
>     public void testUpsertSelectWithSequenceAndLargeDataSet() throws Exception {
>         long ts = nextTimestamp();
>         Properties props = new Properties();
>         //props.setProperty(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(64));
>         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
>         Connection conn = DriverManager.getConnection(getUrl(), props);
>         String ddl = "CREATE TABLE IF NOT EXISTS DUMMY_CURSOR_STORAGE ("
>         + "ORGANIZATION_ID CHAR(15) NOT NULL, QUERY_ID CHAR(15) NOT NULL, CURSOR_ORDER BIGINT NOT NULL "
>         + "CONSTRAINT MAIN_PK PRIMARY KEY (ORGANIZATION_ID, QUERY_ID, CURSOR_ORDER) "
>         + ") SALT_BUCKETS = 64";
>         conn.createStatement().execute(ddl);
>         conn.createStatement().execute("CREATE TABLE DUMMY_SEQ_TEST_DATA (ORGANIZATION_ID CHAR(15) NOT NULL, k1 integer not null, v1 integer not null CONSTRAINT PK PRIMARY KEY (ORGANIZATION_ID, k1, v1) ) VERSIONS=1, SALT_BUCKETS=64");
>         conn.createStatement().execute("create sequence s cache " + Long.MAX_VALUE);
>         conn.close();
>         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
>         conn = DriverManager.getConnection(getUrl(), props);
>         for (int i = 0; i < 500000; i++) {
>         conn.createStatement().execute("upsert into DUMMY_SEQ_TEST_DATA values ('00Dxx0000001gEH'," + i + "," + i + ")");
>         }
>         conn.commit();
>         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 15));
>         conn = DriverManager.getConnection(getUrl(), props);
>         conn.setAutoCommit(true);
>         conn.createStatement().execute("upsert into DUMMY_CURSOR_STORAGE select ORGANIZATION_ID, 'MyQueryId', next value for s FROM DUMMY_SEQ_TEST_DATA");
>         //conn.commit(); 
>         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 20));
>         conn = DriverManager.getConnection(getUrl(), props);
>         ResultSet rs = conn.createStatement().executeQuery("select count(*) from DUMMY_CURSOR_STORAGE");
>         
>         assertTrue(rs.next());
>         assertEquals(500000, rs.getLong(1));
>         conn.close();
>         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 25));
>         ResultSet rs2 = conn.createStatement().executeQuery("select cursor_order from DUMMY_CURSOR_STORAGE");
>         long seq = 1;
>         while (rs2.next()) {
>             assertEquals(seq, rs2.getLong(1));
>             seq++;
>         }
>         conn.close();
>     
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)