You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@phoenix.apache.org by "Jan Fernando (JIRA)" <ji...@apache.org> on 2014/07/17 02:34:06 UTC
[jira] [Comment Edited] (PHOENIX-1096) Duplicate sequence values
returned when doing upsert select against a salted table.
[ https://issues.apache.org/jira/browse/PHOENIX-1096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14064401#comment-14064401 ]
Jan Fernando edited comment on PHOENIX-1096 at 7/17/14 12:33 AM:
-----------------------------------------------------------------
I think I have found the root cause, but am investigating further. The issue appears to be the fact that SequenceValueExpression is cached and the byte[] valueBuffer we write the Sequence value to as we loop over rows from the select part of Upsert...Select is a member field and not thread safe. What I observed is that different threads are writing to this concurrently causing multiple threads to see the same sequence number. Moving the byte[] valueBuffer to a local var results in the correct number of mutations being generated and upserted into the target table.
Here's the initial change I made that appears to address this. But I am running into some other ordering issues, so don't want to declare victory yet. But here is the change I have so far for reference - this is not an official path request - I wanted to capture my findings to date.
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
@@ -174,7 +174,7 @@ public class SequenceManager {
private class SequenceValueExpression extends BaseTerminalExpression {
private final int index;
- private final byte[] valueBuffer = new byte[PDataType.LONG.getByteSize()];
private SequenceValueExpression(int index) {
this.index = index;
@@ -186,6 +186,7 @@ public class SequenceManager {
@Override
public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ byte[] valueBuffer = new byte[PDataType.LONG.getByteSize()];
PDataType.LONG.getCodec().encodeLong(tuple.getSequenceValue(index), valueBuffer, 0);
ptr.set(valueBuffer);
return true;
was (Author: jfernando_sfdc):
I think I have found the root cause, but am investigating further. The issue appears to be the fact that SequenceValueExpression is cached and the byte[] valueBuffer we write the Sequence value to as loop of rows from the select part of Upsert...Select is a member field and not thread safe. What I observed is different threads writing to this concurrently causing multiple threads to see the same sequence number. Moving the byte[] valueBuffer to a local var results in the correct number of mutations being generated and upserted into the target table.
Here's the initial change I made that appears to address this. But I am running into some other ordering issues, so don't want to declare victory yet. But here is the change I have so far for reference - this is not an official path request - I wanted to capture my findings to date.
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
@@ -174,7 +174,7 @@ public class SequenceManager {
private class SequenceValueExpression extends BaseTerminalExpression {
private final int index;
- private final byte[] valueBuffer = new byte[PDataType.LONG.getByteSize()];
private SequenceValueExpression(int index) {
this.index = index;
@@ -186,6 +186,7 @@ public class SequenceManager {
@Override
public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ byte[] valueBuffer = new byte[PDataType.LONG.getByteSize()];
PDataType.LONG.getCodec().encodeLong(tuple.getSequenceValue(index), valueBuffer, 0);
ptr.set(valueBuffer);
return true;
> Duplicate sequence values returned when doing upsert select against a salted table.
> -----------------------------------------------------------------------------------
>
> Key: PHOENIX-1096
> URL: https://issues.apache.org/jira/browse/PHOENIX-1096
> Project: Phoenix
> Issue Type: Bug
> Reporter: Samarth Jain
> Fix For: 3.0.0, 4.0.0, 5.0.0
>
>
> {code}
> @Test
> public void testUpsertSelectWithSequenceAndLargeDataSet() throws Exception {
> long ts = nextTimestamp();
> Properties props = new Properties();
> //props.setProperty(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(64));
> props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
> Connection conn = DriverManager.getConnection(getUrl(), props);
> String ddl = "CREATE TABLE IF NOT EXISTS DUMMY_CURSOR_STORAGE ("
> + "ORGANIZATION_ID CHAR(15) NOT NULL, QUERY_ID CHAR(15) NOT NULL, CURSOR_ORDER BIGINT NOT NULL "
> + "CONSTRAINT MAIN_PK PRIMARY KEY (ORGANIZATION_ID, QUERY_ID, CURSOR_ORDER) "
> + ") SALT_BUCKETS = 64";
> conn.createStatement().execute(ddl);
> conn.createStatement().execute("CREATE TABLE DUMMY_SEQ_TEST_DATA (ORGANIZATION_ID CHAR(15) NOT NULL, k1 integer not null, v1 integer not null CONSTRAINT PK PRIMARY KEY (ORGANIZATION_ID, k1, v1) ) VERSIONS=1, SALT_BUCKETS=64");
> conn.createStatement().execute("create sequence s cache " + Long.MAX_VALUE);
> conn.close();
> props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
> conn = DriverManager.getConnection(getUrl(), props);
> for (int i = 0; i < 500000; i++) {
> conn.createStatement().execute("upsert into DUMMY_SEQ_TEST_DATA values ('00Dxx0000001gEH'," + i + "," + i + ")");
> }
> conn.commit();
> props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 15));
> conn = DriverManager.getConnection(getUrl(), props);
> conn.setAutoCommit(true);
> conn.createStatement().execute("upsert into DUMMY_CURSOR_STORAGE select ORGANIZATION_ID, 'MyQueryId', next value for s FROM DUMMY_SEQ_TEST_DATA");
> //conn.commit();
> props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 20));
> conn = DriverManager.getConnection(getUrl(), props);
> ResultSet rs = conn.createStatement().executeQuery("select count(*) from DUMMY_CURSOR_STORAGE");
>
> assertTrue(rs.next());
> assertEquals(500000, rs.getLong(1));
> conn.close();
> props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 25));
> ResultSet rs2 = conn.createStatement().executeQuery("select cursor_order from DUMMY_CURSOR_STORAGE");
> long seq = 1;
> while (rs2.next()) {
> assertEquals(seq, rs2.getLong(1));
> seq++;
> }
> conn.close();
>
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)