You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@phoenix.apache.org by GitBox <gi...@apache.org> on 2020/09/24 12:24:59 UTC

[GitHub] [phoenix] sakshamgangwar opened a new pull request #896: Phoenix-6153 Table Map Reduce job after a Snapshot based job fails with CorruptedSnapshotException

sakshamgangwar opened a new pull request #896:
URL: https://github.com/apache/phoenix/pull/896


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] sakshamgangwar commented on pull request #896: Phoenix-6153 Table Map Reduce job after a Snapshot based job fails with CorruptedSnapshotException

Posted by GitBox <gi...@apache.org>.
sakshamgangwar commented on pull request #896:
URL: https://github.com/apache/phoenix/pull/896#issuecomment-698312034


   @ChinmaySKulkarni @dbwong I will need your review on this. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] gjacoby126 closed pull request #896: Phoenix-6153 Table Map Reduce job after a Snapshot based job fails with CorruptedSnapshotException

Posted by GitBox <gi...@apache.org>.
gjacoby126 closed pull request #896:
URL: https://github.com/apache/phoenix/pull/896


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] sakshamgangwar commented on a change in pull request #896: Phoenix-6153 Table Map Reduce job after a Snapshot based job fails with CorruptedSnapshotException

Posted by GitBox <gi...@apache.org>.
sakshamgangwar commented on a change in pull request #896:
URL: https://github.com/apache/phoenix/pull/896#discussion_r494574352



##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -127,6 +146,77 @@ public void testMapReduceSnapshotWithLimit() throws Exception {
     configureJob(job, tableName, inputQuery, null, false);
   }
 
+  @Test
+  public void testSnapshotMapReduceJobNotImpactingTableMapReduceJob() throws Exception {
+    //Submitting and asserting successful Map Reduce Job over snapshots
+    PhoenixMapReduceUtil
+            .setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName, tmpDir, null,
+                    FIELD1, FIELD2, FIELD3);
+    configureJob(job, tableName, null, null, false);
+
+    // create table
+    Connection conn = DriverManager.getConnection(getUrl());
+    tableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
+    conn.commit();
+
+    //Submitting next map reduce job over table and making sure that it does not fail with
+    // any wrong snapshot properties set in common configurations which are
+    // used across all jobs.
+    createAndTestJob(conn);
+  }
+
+  private void createAndTestJob(Connection conn)
+          throws SQLException, IOException, InterruptedException, ClassNotFoundException {
+    String stockTableName = generateUniqueName();
+    String stockStatsTableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName));
+    conn.createStatement().execute(String.format(CREATE_STOCK_STATS_TABLE, stockStatsTableName));
+    conn.commit();
+    final Configuration conf = ((PhoenixConnection) conn).getQueryServices().getConfiguration();
+    Job job = Job.getInstance(conf);
+    PhoenixMapReduceUtil.setInput(job, MapReduceIT.StockWritable.class, PhoenixTestingInputFormat.class,
+            stockTableName, null, STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
+    testJob(conn, job, stockTableName, stockStatsTableName);
+  }
+
+  private void testJob(Connection conn, Job job, String stockTableName, String stockStatsTableName)
+          throws SQLException, InterruptedException, IOException, ClassNotFoundException {
+    assertEquals("Failed to reset getRegionBoundaries counter for scanGrouper", 0,
+            TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
+    upsertData(conn, stockTableName);
+
+    // only run locally, rather than having to spin up a MiniMapReduce cluster and lets us use breakpoints
+    job.getConfiguration().set("mapreduce.framework.name", "local");
+
+    setOutput(job, stockStatsTableName);
+
+    job.setMapperClass(MapReduceIT.StockMapper.class);
+    job.setReducerClass(MapReduceIT.StockReducer.class);
+    job.setOutputFormatClass(PhoenixOutputFormat.class);
+
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(DoubleWritable.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(MapReduceIT.StockWritable.class);
+
+    // run job and assert if success
+    assertTrue("Job didn't complete successfully! Check logs for reason.", job.waitForCompletion(true));

Review comment:
       @ChinmaySKulkarni Added the assertion both after snapshot MR job and after Table based job too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] sakshamgangwar commented on a change in pull request #896: Phoenix-6153 Table Map Reduce job after a Snapshot based job fails with CorruptedSnapshotException

Posted by GitBox <gi...@apache.org>.
sakshamgangwar commented on a change in pull request #896:
URL: https://github.com/apache/phoenix/pull/896#discussion_r494573706



##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -63,13 +71,24 @@
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TableSnapshotReadsMapReduceIT.class);
 
+  private static final String STOCK_NAME = "STOCK_NAME";
+  private static final String RECORDING_YEAR = "RECORDING_YEAR";
+  private static final String RECORDINGS_QUARTER = "RECORDINGS_QUARTER";
+  private static final String MAX_RECORDING = "MAX_RECORDING";
   private final static String SNAPSHOT_NAME = "FOO";
   private static final String FIELD1 = "FIELD1";
   private static final String FIELD2 = "FIELD2";
   private static final String FIELD3 = "FIELD3";
   private String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
       " FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT pk PRIMARY KEY (FIELD1 ))";
   private String UPSERT = "UPSERT into %s values (?, ?, ?)";
+  private static final String CREATE_STOCK_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
+          " STOCK_NAME VARCHAR NOT NULL , RECORDING_YEAR  INTEGER NOT  NULL,  RECORDINGS_QUARTER " +
+          " DOUBLE array[] CONSTRAINT pk PRIMARY KEY ( STOCK_NAME, RECORDING_YEAR )) "
+          + "SPLIT ON ('AA')";
+  private static final String CREATE_STOCK_STATS_TABLE =

Review comment:
       @ChinmaySKulkarni I have updated it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] sakshamgangwar commented on a change in pull request #896: Phoenix-6153 Table Map Reduce job after a Snapshot based job fails with CorruptedSnapshotException

Posted by GitBox <gi...@apache.org>.
sakshamgangwar commented on a change in pull request #896:
URL: https://github.com/apache/phoenix/pull/896#discussion_r494615171



##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -63,13 +71,24 @@
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TableSnapshotReadsMapReduceIT.class);
 
+  private static final String STOCK_NAME = "STOCK_NAME";
+  private static final String RECORDING_YEAR = "RECORDING_YEAR";
+  private static final String RECORDINGS_QUARTER = "RECORDINGS_QUARTER";
+  private static final String MAX_RECORDING = "MAX_RECORDING";
   private final static String SNAPSHOT_NAME = "FOO";
   private static final String FIELD1 = "FIELD1";
   private static final String FIELD2 = "FIELD2";
   private static final String FIELD3 = "FIELD3";
   private String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
       " FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT pk PRIMARY KEY (FIELD1 ))";
   private String UPSERT = "UPSERT into %s values (?, ?, ?)";
+  private static final String CREATE_STOCK_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +

Review comment:
       done

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -127,6 +146,77 @@ public void testMapReduceSnapshotWithLimit() throws Exception {
     configureJob(job, tableName, inputQuery, null, false);
   }
 
+  @Test
+  public void testSnapshotMapReduceJobNotImpactingTableMapReduceJob() throws Exception {
+    //Submitting and asserting successful Map Reduce Job over snapshots
+    PhoenixMapReduceUtil
+            .setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName, tmpDir, null,
+                    FIELD1, FIELD2, FIELD3);
+    configureJob(job, tableName, null, null, false);
+
+    // create table
+    Connection conn = DriverManager.getConnection(getUrl());

Review comment:
       using it now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] yanxinyi commented on pull request #896: Phoenix-6153 Table Map Reduce job after a Snapshot based job fails with CorruptedSnapshotException

Posted by GitBox <gi...@apache.org>.
yanxinyi commented on pull request #896:
URL: https://github.com/apache/phoenix/pull/896#issuecomment-701118245


   lgtm +1


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] ChinmaySKulkarni commented on a change in pull request #896: Phoenix-6153 Table Map Reduce job after a Snapshot based job fails with CorruptedSnapshotException

Posted by GitBox <gi...@apache.org>.
ChinmaySKulkarni commented on a change in pull request #896:
URL: https://github.com/apache/phoenix/pull/896#discussion_r494498824



##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -63,13 +71,24 @@
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TableSnapshotReadsMapReduceIT.class);
 
+  private static final String STOCK_NAME = "STOCK_NAME";
+  private static final String RECORDING_YEAR = "RECORDING_YEAR";
+  private static final String RECORDINGS_QUARTER = "RECORDINGS_QUARTER";
+  private static final String MAX_RECORDING = "MAX_RECORDING";
   private final static String SNAPSHOT_NAME = "FOO";
   private static final String FIELD1 = "FIELD1";
   private static final String FIELD2 = "FIELD2";
   private static final String FIELD3 = "FIELD3";
   private String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
       " FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT pk PRIMARY KEY (FIELD1 ))";
   private String UPSERT = "UPSERT into %s values (?, ?, ?)";
+  private static final String CREATE_STOCK_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +

Review comment:
       nit: Use the variables for column names in the DDL statements as well

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -63,13 +71,24 @@
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TableSnapshotReadsMapReduceIT.class);
 
+  private static final String STOCK_NAME = "STOCK_NAME";
+  private static final String RECORDING_YEAR = "RECORDING_YEAR";
+  private static final String RECORDINGS_QUARTER = "RECORDINGS_QUARTER";
+  private static final String MAX_RECORDING = "MAX_RECORDING";
   private final static String SNAPSHOT_NAME = "FOO";
   private static final String FIELD1 = "FIELD1";
   private static final String FIELD2 = "FIELD2";
   private static final String FIELD3 = "FIELD3";
   private String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
       " FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT pk PRIMARY KEY (FIELD1 ))";
   private String UPSERT = "UPSERT into %s values (?, ?, ?)";
+  private static final String CREATE_STOCK_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
+          " STOCK_NAME VARCHAR NOT NULL , RECORDING_YEAR  INTEGER NOT  NULL,  RECORDINGS_QUARTER " +
+          " DOUBLE array[] CONSTRAINT pk PRIMARY KEY ( STOCK_NAME, RECORDING_YEAR )) "
+          + "SPLIT ON ('AA')";
+  private static final String CREATE_STOCK_STATS_TABLE =

Review comment:
       same here

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -127,6 +146,77 @@ public void testMapReduceSnapshotWithLimit() throws Exception {
     configureJob(job, tableName, inputQuery, null, false);
   }
 
+  @Test
+  public void testSnapshotMapReduceJobNotImpactingTableMapReduceJob() throws Exception {
+    //Submitting and asserting successful Map Reduce Job over snapshots
+    PhoenixMapReduceUtil
+            .setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName, tmpDir, null,
+                    FIELD1, FIELD2, FIELD3);
+    configureJob(job, tableName, null, null, false);
+
+    // create table
+    Connection conn = DriverManager.getConnection(getUrl());
+    tableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
+    conn.commit();
+
+    //Submitting next map reduce job over table and making sure that it does not fail with
+    // any wrong snapshot properties set in common configurations which are
+    // used across all jobs.
+    createAndTestJob(conn);
+  }
+
+  private void createAndTestJob(Connection conn)
+          throws SQLException, IOException, InterruptedException, ClassNotFoundException {
+    String stockTableName = generateUniqueName();
+    String stockStatsTableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName));

Review comment:
       ditto about try-with-resources

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -127,6 +146,77 @@ public void testMapReduceSnapshotWithLimit() throws Exception {
     configureJob(job, tableName, inputQuery, null, false);
   }
 
+  @Test
+  public void testSnapshotMapReduceJobNotImpactingTableMapReduceJob() throws Exception {
+    //Submitting and asserting successful Map Reduce Job over snapshots
+    PhoenixMapReduceUtil
+            .setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName, tmpDir, null,
+                    FIELD1, FIELD2, FIELD3);
+    configureJob(job, tableName, null, null, false);
+
+    // create table
+    Connection conn = DriverManager.getConnection(getUrl());

Review comment:
       Use try-with-resources when opening connections and statements throughout.

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -170,6 +260,22 @@ private void configureJob(Job job, String tableName, String inputQuery, String c
     }
   }
 
+  private void upsertData(Connection conn, String stockTableName) throws SQLException {
+    PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, stockTableName));

Review comment:
       ditto about try-with-resources

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -127,6 +146,77 @@ public void testMapReduceSnapshotWithLimit() throws Exception {
     configureJob(job, tableName, inputQuery, null, false);
   }
 
+  @Test
+  public void testSnapshotMapReduceJobNotImpactingTableMapReduceJob() throws Exception {
+    //Submitting and asserting successful Map Reduce Job over snapshots
+    PhoenixMapReduceUtil
+            .setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName, tmpDir, null,
+                    FIELD1, FIELD2, FIELD3);
+    configureJob(job, tableName, null, null, false);
+
+    // create table
+    Connection conn = DriverManager.getConnection(getUrl());
+    tableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
+    conn.commit();
+
+    //Submitting next map reduce job over table and making sure that it does not fail with
+    // any wrong snapshot properties set in common configurations which are
+    // used across all jobs.
+    createAndTestJob(conn);
+  }
+
+  private void createAndTestJob(Connection conn)
+          throws SQLException, IOException, InterruptedException, ClassNotFoundException {
+    String stockTableName = generateUniqueName();
+    String stockStatsTableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName));
+    conn.createStatement().execute(String.format(CREATE_STOCK_STATS_TABLE, stockStatsTableName));
+    conn.commit();
+    final Configuration conf = ((PhoenixConnection) conn).getQueryServices().getConfiguration();
+    Job job = Job.getInstance(conf);
+    PhoenixMapReduceUtil.setInput(job, MapReduceIT.StockWritable.class, PhoenixTestingInputFormat.class,
+            stockTableName, null, STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
+    testJob(conn, job, stockTableName, stockStatsTableName);
+  }
+
+  private void testJob(Connection conn, Job job, String stockTableName, String stockStatsTableName)
+          throws SQLException, InterruptedException, IOException, ClassNotFoundException {
+    assertEquals("Failed to reset getRegionBoundaries counter for scanGrouper", 0,
+            TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
+    upsertData(conn, stockTableName);
+
+    // only run locally, rather than having to spin up a MiniMapReduce cluster and lets us use breakpoints
+    job.getConfiguration().set("mapreduce.framework.name", "local");
+
+    setOutput(job, stockStatsTableName);
+
+    job.setMapperClass(MapReduceIT.StockMapper.class);
+    job.setReducerClass(MapReduceIT.StockReducer.class);
+    job.setOutputFormatClass(PhoenixOutputFormat.class);
+
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(DoubleWritable.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(MapReduceIT.StockWritable.class);
+
+    // run job and assert if success
+    assertTrue("Job didn't complete successfully! Check logs for reason.", job.waitForCompletion(true));

Review comment:
       Shouldn't we also get the final job conf here and test that it doesn't contain any snapshot related properties? Is that possible?

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -127,6 +146,77 @@ public void testMapReduceSnapshotWithLimit() throws Exception {
     configureJob(job, tableName, inputQuery, null, false);
   }
 
+  @Test
+  public void testSnapshotMapReduceJobNotImpactingTableMapReduceJob() throws Exception {
+    //Submitting and asserting successful Map Reduce Job over snapshots
+    PhoenixMapReduceUtil
+            .setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName, tmpDir, null,
+                    FIELD1, FIELD2, FIELD3);
+    configureJob(job, tableName, null, null, false);
+
+    // create table
+    Connection conn = DriverManager.getConnection(getUrl());
+    tableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
+    conn.commit();
+
+    //Submitting next map reduce job over table and making sure that it does not fail with
+    // any wrong snapshot properties set in common configurations which are
+    // used across all jobs.
+    createAndTestJob(conn);
+  }
+
+  private void createAndTestJob(Connection conn)
+          throws SQLException, IOException, InterruptedException, ClassNotFoundException {
+    String stockTableName = generateUniqueName();
+    String stockStatsTableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName));

Review comment:
       Please also close statements everywhere 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] ChinmaySKulkarni commented on a change in pull request #896: Phoenix-6153 Table Map Reduce job after a Snapshot based job fails with CorruptedSnapshotException

Posted by GitBox <gi...@apache.org>.
ChinmaySKulkarni commented on a change in pull request #896:
URL: https://github.com/apache/phoenix/pull/896#discussion_r494709284



##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -127,6 +146,77 @@ public void testMapReduceSnapshotWithLimit() throws Exception {
     configureJob(job, tableName, inputQuery, null, false);
   }
 
+  @Test
+  public void testSnapshotMapReduceJobNotImpactingTableMapReduceJob() throws Exception {
+    //Submitting and asserting successful Map Reduce Job over snapshots
+    PhoenixMapReduceUtil
+            .setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName, tmpDir, null,
+                    FIELD1, FIELD2, FIELD3);
+    configureJob(job, tableName, null, null, false);
+
+    // create table
+    Connection conn = DriverManager.getConnection(getUrl());
+    tableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
+    conn.commit();
+
+    //Submitting next map reduce job over table and making sure that it does not fail with
+    // any wrong snapshot properties set in common configurations which are
+    // used across all jobs.
+    createAndTestJob(conn);
+  }
+
+  private void createAndTestJob(Connection conn)
+          throws SQLException, IOException, InterruptedException, ClassNotFoundException {
+    String stockTableName = generateUniqueName();
+    String stockStatsTableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName));

Review comment:
       Please also close statements everywhere 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] sakshamgangwar commented on a change in pull request #896: Phoenix-6153 Table Map Reduce job after a Snapshot based job fails with CorruptedSnapshotException

Posted by GitBox <gi...@apache.org>.
sakshamgangwar commented on a change in pull request #896:
URL: https://github.com/apache/phoenix/pull/896#discussion_r494574053



##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -127,6 +146,77 @@ public void testMapReduceSnapshotWithLimit() throws Exception {
     configureJob(job, tableName, inputQuery, null, false);
   }
 
+  @Test
+  public void testSnapshotMapReduceJobNotImpactingTableMapReduceJob() throws Exception {
+    //Submitting and asserting successful Map Reduce Job over snapshots
+    PhoenixMapReduceUtil
+            .setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName, tmpDir, null,
+                    FIELD1, FIELD2, FIELD3);
+    configureJob(job, tableName, null, null, false);
+
+    // create table
+    Connection conn = DriverManager.getConnection(getUrl());
+    tableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
+    conn.commit();
+
+    //Submitting next map reduce job over table and making sure that it does not fail with
+    // any wrong snapshot properties set in common configurations which are
+    // used across all jobs.
+    createAndTestJob(conn);
+  }
+
+  private void createAndTestJob(Connection conn)
+          throws SQLException, IOException, InterruptedException, ClassNotFoundException {
+    String stockTableName = generateUniqueName();
+    String stockStatsTableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName));

Review comment:
       Handled in try-with-resources block in calling method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] ChinmaySKulkarni commented on a change in pull request #896: Phoenix-6153 Table Map Reduce job after a Snapshot based job fails with CorruptedSnapshotException

Posted by GitBox <gi...@apache.org>.
ChinmaySKulkarni commented on a change in pull request #896:
URL: https://github.com/apache/phoenix/pull/896#discussion_r494498824



##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -63,13 +71,24 @@
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TableSnapshotReadsMapReduceIT.class);
 
+  private static final String STOCK_NAME = "STOCK_NAME";
+  private static final String RECORDING_YEAR = "RECORDING_YEAR";
+  private static final String RECORDINGS_QUARTER = "RECORDINGS_QUARTER";
+  private static final String MAX_RECORDING = "MAX_RECORDING";
   private final static String SNAPSHOT_NAME = "FOO";
   private static final String FIELD1 = "FIELD1";
   private static final String FIELD2 = "FIELD2";
   private static final String FIELD3 = "FIELD3";
   private String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
       " FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT pk PRIMARY KEY (FIELD1 ))";
   private String UPSERT = "UPSERT into %s values (?, ?, ?)";
+  private static final String CREATE_STOCK_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +

Review comment:
       nit: Use the variables for column names in the DDL statements as well

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -63,13 +71,24 @@
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TableSnapshotReadsMapReduceIT.class);
 
+  private static final String STOCK_NAME = "STOCK_NAME";
+  private static final String RECORDING_YEAR = "RECORDING_YEAR";
+  private static final String RECORDINGS_QUARTER = "RECORDINGS_QUARTER";
+  private static final String MAX_RECORDING = "MAX_RECORDING";
   private final static String SNAPSHOT_NAME = "FOO";
   private static final String FIELD1 = "FIELD1";
   private static final String FIELD2 = "FIELD2";
   private static final String FIELD3 = "FIELD3";
   private String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
       " FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT pk PRIMARY KEY (FIELD1 ))";
   private String UPSERT = "UPSERT into %s values (?, ?, ?)";
+  private static final String CREATE_STOCK_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
+          " STOCK_NAME VARCHAR NOT NULL , RECORDING_YEAR  INTEGER NOT  NULL,  RECORDINGS_QUARTER " +
+          " DOUBLE array[] CONSTRAINT pk PRIMARY KEY ( STOCK_NAME, RECORDING_YEAR )) "
+          + "SPLIT ON ('AA')";
+  private static final String CREATE_STOCK_STATS_TABLE =

Review comment:
       same here

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -127,6 +146,77 @@ public void testMapReduceSnapshotWithLimit() throws Exception {
     configureJob(job, tableName, inputQuery, null, false);
   }
 
+  @Test
+  public void testSnapshotMapReduceJobNotImpactingTableMapReduceJob() throws Exception {
+    //Submitting and asserting successful Map Reduce Job over snapshots
+    PhoenixMapReduceUtil
+            .setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName, tmpDir, null,
+                    FIELD1, FIELD2, FIELD3);
+    configureJob(job, tableName, null, null, false);
+
+    // create table
+    Connection conn = DriverManager.getConnection(getUrl());
+    tableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
+    conn.commit();
+
+    //Submitting next map reduce job over table and making sure that it does not fail with
+    // any wrong snapshot properties set in common configurations which are
+    // used across all jobs.
+    createAndTestJob(conn);
+  }
+
+  private void createAndTestJob(Connection conn)
+          throws SQLException, IOException, InterruptedException, ClassNotFoundException {
+    String stockTableName = generateUniqueName();
+    String stockStatsTableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName));

Review comment:
       ditto about try-with-resources

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -127,6 +146,77 @@ public void testMapReduceSnapshotWithLimit() throws Exception {
     configureJob(job, tableName, inputQuery, null, false);
   }
 
+  @Test
+  public void testSnapshotMapReduceJobNotImpactingTableMapReduceJob() throws Exception {
+    //Submitting and asserting successful Map Reduce Job over snapshots
+    PhoenixMapReduceUtil
+            .setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName, tmpDir, null,
+                    FIELD1, FIELD2, FIELD3);
+    configureJob(job, tableName, null, null, false);
+
+    // create table
+    Connection conn = DriverManager.getConnection(getUrl());

Review comment:
       Use try-with-resources when opening connections and statements throughout.

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -170,6 +260,22 @@ private void configureJob(Job job, String tableName, String inputQuery, String c
     }
   }
 
+  private void upsertData(Connection conn, String stockTableName) throws SQLException {
+    PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, stockTableName));

Review comment:
       ditto about try-with-resources

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -127,6 +146,77 @@ public void testMapReduceSnapshotWithLimit() throws Exception {
     configureJob(job, tableName, inputQuery, null, false);
   }
 
+  @Test
+  public void testSnapshotMapReduceJobNotImpactingTableMapReduceJob() throws Exception {
+    //Submitting and asserting successful Map Reduce Job over snapshots
+    PhoenixMapReduceUtil
+            .setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName, tmpDir, null,
+                    FIELD1, FIELD2, FIELD3);
+    configureJob(job, tableName, null, null, false);
+
+    // create table
+    Connection conn = DriverManager.getConnection(getUrl());
+    tableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
+    conn.commit();
+
+    //Submitting next map reduce job over table and making sure that it does not fail with
+    // any wrong snapshot properties set in common configurations which are
+    // used across all jobs.
+    createAndTestJob(conn);
+  }
+
+  private void createAndTestJob(Connection conn)
+          throws SQLException, IOException, InterruptedException, ClassNotFoundException {
+    String stockTableName = generateUniqueName();
+    String stockStatsTableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName));
+    conn.createStatement().execute(String.format(CREATE_STOCK_STATS_TABLE, stockStatsTableName));
+    conn.commit();
+    final Configuration conf = ((PhoenixConnection) conn).getQueryServices().getConfiguration();
+    Job job = Job.getInstance(conf);
+    PhoenixMapReduceUtil.setInput(job, MapReduceIT.StockWritable.class, PhoenixTestingInputFormat.class,
+            stockTableName, null, STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
+    testJob(conn, job, stockTableName, stockStatsTableName);
+  }
+
+  private void testJob(Connection conn, Job job, String stockTableName, String stockStatsTableName)
+          throws SQLException, InterruptedException, IOException, ClassNotFoundException {
+    assertEquals("Failed to reset getRegionBoundaries counter for scanGrouper", 0,
+            TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
+    upsertData(conn, stockTableName);
+
+    // only run locally, rather than having to spin up a MiniMapReduce cluster and lets us use breakpoints
+    job.getConfiguration().set("mapreduce.framework.name", "local");
+
+    setOutput(job, stockStatsTableName);
+
+    job.setMapperClass(MapReduceIT.StockMapper.class);
+    job.setReducerClass(MapReduceIT.StockReducer.class);
+    job.setOutputFormatClass(PhoenixOutputFormat.class);
+
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(DoubleWritable.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(MapReduceIT.StockWritable.class);
+
+    // run job and assert if success
+    assertTrue("Job didn't complete successfully! Check logs for reason.", job.waitForCompletion(true));

Review comment:
       Shouldn't we also get the final job conf here and test that it doesn't contain any snapshot related properties? Is that possible?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] gjacoby126 commented on pull request #896: Phoenix-6153 Table Map Reduce job after a Snapshot based job fails with CorruptedSnapshotException

Posted by GitBox <gi...@apache.org>.
gjacoby126 commented on pull request #896:
URL: https://github.com/apache/phoenix/pull/896#issuecomment-730852506


   JIRA is marked resolved, closing PR


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] sakshamgangwar commented on a change in pull request #896: Phoenix-6153 Table Map Reduce job after a Snapshot based job fails with CorruptedSnapshotException

Posted by GitBox <gi...@apache.org>.
sakshamgangwar commented on a change in pull request #896:
URL: https://github.com/apache/phoenix/pull/896#discussion_r494574994



##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -170,6 +260,22 @@ private void configureJob(Job job, String tableName, String inputQuery, String c
     }
   }
 
+  private void upsertData(Connection conn, String stockTableName) throws SQLException {
+    PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, stockTableName));

Review comment:
       handled in the caller test method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] sakshamgangwar commented on pull request #896: Phoenix-6153 Table Map Reduce job after a Snapshot based job fails with CorruptedSnapshotException

Posted by GitBox <gi...@apache.org>.
sakshamgangwar commented on pull request #896:
URL: https://github.com/apache/phoenix/pull/896#issuecomment-698312034


   @ChinmaySKulkarni @dbwong I will need your review on this. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix] sakshamgangwar commented on a change in pull request #896: Phoenix-6153 Table Map Reduce job after a Snapshot based job fails with CorruptedSnapshotException

Posted by GitBox <gi...@apache.org>.
sakshamgangwar commented on a change in pull request #896:
URL: https://github.com/apache/phoenix/pull/896#discussion_r494573706



##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -63,13 +71,24 @@
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TableSnapshotReadsMapReduceIT.class);
 
+  private static final String STOCK_NAME = "STOCK_NAME";
+  private static final String RECORDING_YEAR = "RECORDING_YEAR";
+  private static final String RECORDINGS_QUARTER = "RECORDINGS_QUARTER";
+  private static final String MAX_RECORDING = "MAX_RECORDING";
   private final static String SNAPSHOT_NAME = "FOO";
   private static final String FIELD1 = "FIELD1";
   private static final String FIELD2 = "FIELD2";
   private static final String FIELD3 = "FIELD3";
   private String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
       " FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT pk PRIMARY KEY (FIELD1 ))";
   private String UPSERT = "UPSERT into %s values (?, ?, ?)";
+  private static final String CREATE_STOCK_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
+          " STOCK_NAME VARCHAR NOT NULL , RECORDING_YEAR  INTEGER NOT  NULL,  RECORDINGS_QUARTER " +
+          " DOUBLE array[] CONSTRAINT pk PRIMARY KEY ( STOCK_NAME, RECORDING_YEAR )) "
+          + "SPLIT ON ('AA')";
+  private static final String CREATE_STOCK_STATS_TABLE =

Review comment:
       @ChinmaySKulkarni I have updated it.

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -127,6 +146,77 @@ public void testMapReduceSnapshotWithLimit() throws Exception {
     configureJob(job, tableName, inputQuery, null, false);
   }
 
+  @Test
+  public void testSnapshotMapReduceJobNotImpactingTableMapReduceJob() throws Exception {
+    //Submitting and asserting successful Map Reduce Job over snapshots
+    PhoenixMapReduceUtil
+            .setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName, tmpDir, null,
+                    FIELD1, FIELD2, FIELD3);
+    configureJob(job, tableName, null, null, false);
+
+    // create table
+    Connection conn = DriverManager.getConnection(getUrl());
+    tableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
+    conn.commit();
+
+    //Submitting next map reduce job over table and making sure that it does not fail with
+    // any wrong snapshot properties set in common configurations which are
+    // used across all jobs.
+    createAndTestJob(conn);
+  }
+
+  private void createAndTestJob(Connection conn)
+          throws SQLException, IOException, InterruptedException, ClassNotFoundException {
+    String stockTableName = generateUniqueName();
+    String stockStatsTableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName));

Review comment:
       Handled in try-with-resources block in calling method.

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -127,6 +146,77 @@ public void testMapReduceSnapshotWithLimit() throws Exception {
     configureJob(job, tableName, inputQuery, null, false);
   }
 
+  @Test
+  public void testSnapshotMapReduceJobNotImpactingTableMapReduceJob() throws Exception {
+    //Submitting and asserting successful Map Reduce Job over snapshots
+    PhoenixMapReduceUtil
+            .setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName, tmpDir, null,
+                    FIELD1, FIELD2, FIELD3);
+    configureJob(job, tableName, null, null, false);
+
+    // create table
+    Connection conn = DriverManager.getConnection(getUrl());
+    tableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
+    conn.commit();
+
+    //Submitting next map reduce job over table and making sure that it does not fail with
+    // any wrong snapshot properties set in common configurations which are
+    // used across all jobs.
+    createAndTestJob(conn);
+  }
+
+  private void createAndTestJob(Connection conn)
+          throws SQLException, IOException, InterruptedException, ClassNotFoundException {
+    String stockTableName = generateUniqueName();
+    String stockStatsTableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName));
+    conn.createStatement().execute(String.format(CREATE_STOCK_STATS_TABLE, stockStatsTableName));
+    conn.commit();
+    final Configuration conf = ((PhoenixConnection) conn).getQueryServices().getConfiguration();
+    Job job = Job.getInstance(conf);
+    PhoenixMapReduceUtil.setInput(job, MapReduceIT.StockWritable.class, PhoenixTestingInputFormat.class,
+            stockTableName, null, STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
+    testJob(conn, job, stockTableName, stockStatsTableName);
+  }
+
+  private void testJob(Connection conn, Job job, String stockTableName, String stockStatsTableName)
+          throws SQLException, InterruptedException, IOException, ClassNotFoundException {
+    assertEquals("Failed to reset getRegionBoundaries counter for scanGrouper", 0,
+            TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
+    upsertData(conn, stockTableName);
+
+    // only run locally, rather than having to spin up a MiniMapReduce cluster and lets us use breakpoints
+    job.getConfiguration().set("mapreduce.framework.name", "local");
+
+    setOutput(job, stockStatsTableName);
+
+    job.setMapperClass(MapReduceIT.StockMapper.class);
+    job.setReducerClass(MapReduceIT.StockReducer.class);
+    job.setOutputFormatClass(PhoenixOutputFormat.class);
+
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(DoubleWritable.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(MapReduceIT.StockWritable.class);
+
+    // run job and assert if success
+    assertTrue("Job didn't complete successfully! Check logs for reason.", job.waitForCompletion(true));

Review comment:
       @ChinmaySKulkarni Added the assertion both after snapshot MR job and after Table based job too.

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -170,6 +260,22 @@ private void configureJob(Job job, String tableName, String inputQuery, String c
     }
   }
 
+  private void upsertData(Connection conn, String stockTableName) throws SQLException {
+    PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, stockTableName));

Review comment:
       handled in the caller test method.

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -63,13 +71,24 @@
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TableSnapshotReadsMapReduceIT.class);
 
+  private static final String STOCK_NAME = "STOCK_NAME";
+  private static final String RECORDING_YEAR = "RECORDING_YEAR";
+  private static final String RECORDINGS_QUARTER = "RECORDINGS_QUARTER";
+  private static final String MAX_RECORDING = "MAX_RECORDING";
   private final static String SNAPSHOT_NAME = "FOO";
   private static final String FIELD1 = "FIELD1";
   private static final String FIELD2 = "FIELD2";
   private static final String FIELD3 = "FIELD3";
   private String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
       " FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT pk PRIMARY KEY (FIELD1 ))";
   private String UPSERT = "UPSERT into %s values (?, ?, ?)";
+  private static final String CREATE_STOCK_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +

Review comment:
       done

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
##########
@@ -127,6 +146,77 @@ public void testMapReduceSnapshotWithLimit() throws Exception {
     configureJob(job, tableName, inputQuery, null, false);
   }
 
+  @Test
+  public void testSnapshotMapReduceJobNotImpactingTableMapReduceJob() throws Exception {
+    //Submitting and asserting successful Map Reduce Job over snapshots
+    PhoenixMapReduceUtil
+            .setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName, tmpDir, null,
+                    FIELD1, FIELD2, FIELD3);
+    configureJob(job, tableName, null, null, false);
+
+    // create table
+    Connection conn = DriverManager.getConnection(getUrl());

Review comment:
       using it now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org