You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/08/16 22:49:31 UTC

[phoenix] branch master updated: PHOENIX-5348: Fix flaky test: testIndexRebuildTask

This is an automated email from the ASF dual-hosted git repository.

chinmayskulkarni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f8b3c2  PHOENIX-5348: Fix flaky test: testIndexRebuildTask
4f8b3c2 is described below

commit 4f8b3c2207a56c4afaf2502c0af26a1f4b63f2bb
Author: Gokcen Iskender <gi...@salesforce.com>
AuthorDate: Thu Jul 25 11:04:18 2019 -0700

    PHOENIX-5348: Fix flaky test: testIndexRebuildTask
    
    Signed-off-by: Chinmay Kulkarni <ch...@apache.org>
---
 .../phoenix/end2end/DropTableWithViewsIT.java      | 33 ++++++++-
 .../apache/phoenix/end2end/IndexRebuildTaskIT.java | 86 +++++++++++-----------
 .../phoenix/end2end/index/IndexMetadataIT.java     |  5 +-
 .../phoenix/coprocessor/TaskRegionObserver.java    | 12 ++-
 .../coprocessor/tasks/IndexRebuildTask.java        |  4 +-
 .../index/PhoenixIndexImportDirectReducer.java     |  7 +-
 .../org/apache/phoenix/schema/MetaDataClient.java  |  2 +-
 .../java/org/apache/phoenix/schema/task/Task.java  | 19 ++++-
 8 files changed, 104 insertions(+), 64 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java
index 6741585..2589fa3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.end2end;
 
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -26,6 +27,7 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Timestamp;
 import java.util.Arrays;
 import java.util.Collection;
 
@@ -137,7 +139,8 @@ public class DropTableWithViewsIT extends SplitSystemCatalogIT {
             task.run();
             task.run();
 
-            assertTaskColumns(conn, PTable.TaskStatus.COMPLETED.toString(), PTable.TaskType.DROP_CHILD_VIEWS, null);
+            assertTaskColumns(conn, PTable.TaskStatus.COMPLETED.toString(), PTable.TaskType.DROP_CHILD_VIEWS,
+                    null, null, null, null, null);
 
             // Views should be dropped by now
             TableName linkTable = TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES);
@@ -156,7 +159,9 @@ public class DropTableWithViewsIT extends SplitSystemCatalogIT {
         }
     }
 
-    public static void assertTaskColumns(Connection conn, String expectedStatus, PTable.TaskType taskType, String expectedData)
+    public static void assertTaskColumns(Connection conn, String expectedStatus, PTable.TaskType taskType,
+            String expectedTableName, String expectedTenantId, String expectedSchema, Timestamp expectedTs,
+            String expectedIndexName)
             throws SQLException {
         ResultSet rs = conn.createStatement().executeQuery("SELECT * " +
                 " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
@@ -166,9 +171,29 @@ public class DropTableWithViewsIT extends SplitSystemCatalogIT {
         String taskStatus = rs.getString(PhoenixDatabaseMetaData.TASK_STATUS);
         assertEquals(expectedStatus, taskStatus);
 
-        if (expectedData != null) {
+        if (expectedTableName != null) {
+            String tableName = rs.getString(PhoenixDatabaseMetaData.TABLE_NAME);
+            assertEquals(expectedTableName, tableName);
+        }
+
+        if (expectedTenantId != null) {
+            String tenantId = rs.getString(PhoenixDatabaseMetaData.TENANT_ID);
+            assertEquals(expectedTenantId, tenantId);
+        }
+
+        if (expectedSchema != null) {
+            String schema = rs.getString(PhoenixDatabaseMetaData.TABLE_SCHEM);
+            assertEquals(expectedSchema, schema);
+        }
+
+        if (expectedTs != null) {
+            Timestamp ts = rs.getTimestamp(PhoenixDatabaseMetaData.TASK_TS);
+            assertEquals(expectedTs, ts);
+        }
+
+        if (expectedIndexName != null) {
             String data = rs.getString(PhoenixDatabaseMetaData.TASK_DATA);
-            assertEquals(expectedData, data);
+            assertEquals(true, data.contains("\"IndexName\":\"" + expectedIndexName));
         }
     }
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java
index c63cf2c..fc514ef 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java
@@ -35,8 +35,6 @@ import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.TestUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -49,12 +47,11 @@ import java.util.HashMap;
 import java.util.Properties;
 
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.waitForIndexRebuild;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-// disabled -- see PHOENIX-5348
-public abstract class IndexRebuildTaskIT extends BaseUniqueNamesOwnClusterIT {
+public class IndexRebuildTaskIT extends BaseUniqueNamesOwnClusterIT {
     protected static String TENANT1 = "tenant1";
     private static RegionCoprocessorEnvironment TaskRegionEnvironment;
 
@@ -83,15 +80,16 @@ public abstract class IndexRebuildTaskIT extends BaseUniqueNamesOwnClusterIT {
     @Test
     public void testIndexRebuildTask() throws Throwable {
         String baseTable = generateUniqueName();
+        String viewName = generateUniqueName();
         Connection conn = null;
-        Connection viewConn = null;
+        Connection tenantConn = null;
         try {
             conn = DriverManager.getConnection(getUrl());
             conn.setAutoCommit(false);
             Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
             props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, TENANT1);
 
-            viewConn =DriverManager.getConnection(getUrl(), props);
+            tenantConn =DriverManager.getConnection(getUrl(), props);
             String ddlFormat =
                     "CREATE TABLE IF NOT EXISTS " + baseTable + "  ("
                             + " %s PK2 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR "
@@ -99,45 +97,37 @@ public abstract class IndexRebuildTaskIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute(generateDDL(ddlFormat));
             conn.commit();
             // Create a view
-            String viewName = generateUniqueName();
             String viewDDL = "CREATE VIEW " + viewName + " AS SELECT * FROM " + baseTable;
-            viewConn.createStatement().execute(viewDDL);
+            tenantConn.createStatement().execute(viewDDL);
 
             // Create index
             String indexName = generateUniqueName();
             String idxSDDL = String.format("CREATE INDEX %s ON %s (V1)", indexName, viewName);
 
-            viewConn.createStatement().execute(idxSDDL);
+            tenantConn.createStatement().execute(idxSDDL);
 
             // Insert rows
             int numOfValues = 1000;
             for (int i=0; i < numOfValues; i++){
-                viewConn.createStatement().execute(
+                tenantConn.createStatement().execute(
                         String.format("UPSERT INTO %s VALUES('%s', '%s', '%s')", viewName, String.valueOf(i), "y",
                                 "z"));
             }
-            viewConn.commit();
-
-            String data = "{IndexName:" + indexName + "}";
-            // Run IndexRebuildTask
-            TaskRegionObserver.SelfHealingTask task =
-                    new TaskRegionObserver.SelfHealingTask(
-                            TaskRegionEnvironment, QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
-
-            Timestamp startTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
-            // Add a task to System.Task to build indexes
-            Task.addTask(conn.unwrap(PhoenixConnection.class), PTable.TaskType.INDEX_REBUILD,
-                    TENANT1, null, viewName,
-                    PTable.TaskStatus.CREATED.toString(), data, null, startTs, null, true);
-
-
-            task.run();
+            tenantConn.commit();
 
+            waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
             String viewIndexTableName = MetaDataUtil.getViewIndexPhysicalName(baseTable);
             ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class).getQueryServices();
-            int count = getUtility().countRows(queryServices.getTable(Bytes.toBytes(viewIndexTableName)));
-            assertTrue(count == numOfValues);
 
+            Table indexHTable = queryServices.getTable(Bytes.toBytes(viewIndexTableName));
+            int count = getUtility().countRows(indexHTable);
+            assertEquals(numOfValues, count);
+
+            // Alter to Unusable makes the index status inactive.
+            // If I Alter to DISABLE, it fails to in Index tool while setting state to active due to Invalid transition.
+            tenantConn.createStatement().execute(
+                    String.format("ALTER INDEX %s ON %s UNUSABLE", indexName, viewName));
+            tenantConn.commit();
 
             // Remove index contents and try again
             Admin admin = queryServices.getAdmin();
@@ -145,50 +135,56 @@ public abstract class IndexRebuildTaskIT extends BaseUniqueNamesOwnClusterIT {
             admin.disableTable(tableName);
             admin.truncateTable(tableName, false);
 
-            data = "{IndexName:" + indexName + ", DisableBefore:true}";
+            count = getUtility().countRows(indexHTable);
+            assertEquals(0, count);
+
+            String data = "{IndexName:" + indexName + ", DisableBefore: true}";
+
+            // Run IndexRebuildTask
+            TaskRegionObserver.SelfHealingTask task =
+                    new TaskRegionObserver.SelfHealingTask(
+                            TaskRegionEnvironment, QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
 
-            // Add a new task (update status to created) to System.Task to rebuild indexes
+            Timestamp startTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
             Task.addTask(conn.unwrap(PhoenixConnection.class), PTable.TaskType.INDEX_REBUILD,
                     TENANT1, null, viewName,
                     PTable.TaskStatus.CREATED.toString(), data, null, startTs, null, true);
             task.run();
 
-            Table systemHTable= queryServices.getTable(Bytes.toBytes("SYSTEM."+PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE));
-            count = getUtility().countRows(systemHTable);
-            assertEquals(1, count);
-
             // Check task status and other column values.
-            waitForTaskState(conn, PTable.TaskType.INDEX_REBUILD, PTable.TaskStatus.COMPLETED);
+            waitForTaskState(conn, PTable.TaskType.INDEX_REBUILD, viewName, PTable.TaskStatus.COMPLETED);
 
             // See that index is rebuilt and confirm index has rows
-            Table htable= queryServices.getTable(Bytes.toBytes(viewIndexTableName));
-            count = getUtility().countRows(htable);
+            count = getUtility().countRows(indexHTable);
             assertEquals(numOfValues, count);
         } finally {
-            conn.createStatement().execute("DELETE " + " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
-            conn.commit();
             if (conn != null) {
+                conn.createStatement().execute("DELETE " + " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME
+                        + " WHERE TABLE_NAME ='" + viewName  + "'");
+                conn.commit();
                 conn.close();
             }
-            if (viewConn != null) {
-                viewConn.close();
+            if (tenantConn != null) {
+                tenantConn.close();
             }
         }
     }
 
-    public static void waitForTaskState(Connection conn, PTable.TaskType taskType, PTable.TaskStatus expectedTaskStatus) throws InterruptedException,
+    public static void waitForTaskState(Connection conn, PTable.TaskType taskType, String expectedTableName,
+            PTable.TaskStatus expectedTaskStatus) throws InterruptedException,
             SQLException {
         int maxTries = 100, nTries = 0;
         do {
             Thread.sleep(2000);
             ResultSet rs = conn.createStatement().executeQuery("SELECT * " +
                     " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
-                    " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = " +
+                    " WHERE " + PhoenixDatabaseMetaData.TABLE_NAME + "='" + expectedTableName + "' AND " +
+                    PhoenixDatabaseMetaData.TASK_TYPE + " = " +
                     taskType.getSerializedValue());
 
             String taskStatus = null;
 
-            if (rs.next()) {
+            while (rs.next()) {
                 taskStatus = rs.getString(PhoenixDatabaseMetaData.TASK_STATUS);
                 boolean matchesExpected = (expectedTaskStatus.toString().equals(taskStatus));
                 if (matchesExpected) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
index 395fcdf..6352d35 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
@@ -658,7 +658,8 @@ public class IndexMetadataIT extends ParallelStatsDisabledIT {
             upsertStmt.setString(2, "val3");
             upsertStmt.execute();
 
-            conn.createStatement().execute("DELETE " + " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
+            conn.createStatement().execute("DELETE " + " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
+                    " WHERE TABLE_NAME ='" + testTable  + "'");
             conn.commit();
 
             conn.createStatement().execute(
@@ -676,7 +677,7 @@ public class IndexMetadataIT extends ParallelStatsDisabledIT {
 
             // Check task status and other column values.
             DropTableWithViewsIT.assertTaskColumns(conn, PTable.TaskStatus.COMPLETED.toString(),
-                    PTable.TaskType.INDEX_REBUILD, null);
+                    PTable.TaskType.INDEX_REBUILD, testTable, null, null, null,indexName);
 
             // Check that the value is updated to correct one
             val = getIndexValue(conn, indexName, 2);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
index a48d82f..872ea3b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
@@ -190,15 +190,19 @@ public class TaskRegionObserver implements RegionObserver, RegionCoprocessor {
 
                         if (result == null) {
                             // reread task record. There might be async setting of task status
-                            taskRecord = Task.queryTaskTable(connForTask, taskRecord.getSchemaName(), taskRecord.getTableName(),
-                                    taskType, taskRecord.getTenantId(), null).get(0);
+                            taskRecord =
+                                    Task.queryTaskTable(connForTask, taskRecord.getTimeStamp(),
+                                            taskRecord.getSchemaName(), taskRecord.getTableName(),
+                                            taskType, taskRecord.getTenantId(), null).get(0);
                             if (taskRecord.getStatus() != null && Arrays.stream(excludeStates).anyMatch(taskRecord.getStatus()::equals)) {
                                 continue;
                             }
+
                             // Change task status to STARTED
                             Task.addTask(connForTask, taskRecord.getTaskType(), taskRecord.getTenantId(), taskRecord.getSchemaName(),
                                     taskRecord.getTableName(), PTable.TaskStatus.STARTED.toString(),
-                                    taskRecord.getData(), taskRecord.getPriority(), taskRecord.getTimeStamp(), null, true);
+                                    taskRecord.getData(), taskRecord.getPriority(), taskRecord.getTimeStamp(), null,
+                                    true);
 
                             // invokes the method at runtime
                             result = (TaskResult) runMethod.invoke(obj, taskRecord);
@@ -240,7 +244,7 @@ public class TaskRegionObserver implements RegionObserver, RegionCoprocessor {
         }
 
         public static void setEndTaskStatus(PhoenixConnection connForTask, Task.TaskRecord taskRecord, String taskStatus)
-                throws IOException {
+                throws IOException, SQLException {
             // update data with details.
             String data = taskRecord.getData();
             if (Strings.isNullOrEmpty(data)) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
index ad3e5db..168605f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
@@ -162,7 +162,9 @@ public class IndexRebuildTask extends BaseTask  {
             Cluster cluster = new Cluster(configuration);
 
             Job job = cluster.getJob(org.apache.hadoop.mapreduce.JobID.forName(jobID));
-
+            if (job == null) {
+                return new  TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SKIPPED, "");
+            }
             if (job != null && job.isComplete()) {
                 if (job.isSuccessful()) {
                     LOGGER.warn("IndexRebuildTask checkCurrentResult job is successful "
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index 5424407..0e40aff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -20,11 +20,9 @@ package org.apache.phoenix.mapreduce.index;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.sql.Timestamp;
 import java.util.List;
 import java.util.Properties;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -36,7 +34,6 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.task.Task;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.SchemaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,12 +72,12 @@ public class PhoenixIndexImportDirectReducer extends
             String schemaName = SchemaUtil.getSchemaNameFromFullName(fullTableName);
             String tableName = SchemaUtil.getTableNameFromFullName(fullTableName);
             String indexName = PhoenixConfigurationUtil.getDisableIndexes(context.getConfiguration());
-            List<Task.TaskRecord> taskRecords = Task.queryTaskTable(connection, schemaName, tableName,
+            List<Task.TaskRecord> taskRecords = Task.queryTaskTable(connection, null, schemaName, tableName,
                     PTable.TaskType.INDEX_REBUILD, tenantId, indexName);
             if (taskRecords != null && taskRecords.size() > 0) {
                 for (Task.TaskRecord taskRecord : taskRecords) {
                     TaskRegionObserver.SelfHealingTask.setEndTaskStatus(
-                            connection.unwrap(PhoenixConnection.class), taskRecords.get(0),
+                            connection.unwrap(PhoenixConnection.class), taskRecord,
                             PTable.TaskStatus.COMPLETED.toString());
                 }
             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 4afb0a0..8a794ac 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -4354,7 +4354,7 @@ public class MetaDataClient {
                     indexRef.setTable(result.getTable());
                     if (newIndexState == PIndexState.BUILDING && isAsync) {
                         if (isRebuildAll) {
-                            List<Task.TaskRecord> tasks = Task.queryTaskTable(connection, schemaName, tableName, PTable.TaskType.INDEX_REBUILD,
+                            List<Task.TaskRecord> tasks = Task.queryTaskTable(connection, null, schemaName, tableName, PTable.TaskType.INDEX_REBUILD,
                                     tenantId, indexName);
                             if (tasks == null || tasks.size() == 0) {
                                 Timestamp ts = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
index 43f293e..4161667 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
@@ -35,6 +35,7 @@ import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
 
 public class Task {
@@ -202,7 +203,8 @@ public class Task {
         return result;
     }
 
-    public static List<TaskRecord> queryTaskTable(Connection connection, String schema, String tableName,
+    public static List<TaskRecord> queryTaskTable(Connection connection, Timestamp ts,
+            String schema, String tableName,
             PTable.TaskType taskType, String tenantId, String indexName)
             throws SQLException {
         String taskQuery = "SELECT " +
@@ -230,7 +232,20 @@ public class Task {
                 taskQuery += " AND " + PhoenixDatabaseMetaData.TASK_DATA + " LIKE '%" + indexName + "%'";
             }
 
-        return populateTasks(connection, taskQuery);
+            List<TaskRecord> taskRecords = populateTasks(connection, taskQuery);
+            List<TaskRecord> result = new ArrayList<TaskRecord>();
+            if (ts != null) {
+                // Adding TASK_TS to the where clause did not work. It returns empty when directly querying with the timestamp.
+                for (TaskRecord tr : taskRecords) {
+                    if (tr.getTimeStamp().equals(ts)) {
+                        result.add(tr);
+                    }
+                }
+            } else {
+                result = taskRecords;
+            }
+
+        return result;
     }
 
     public static List<TaskRecord> queryTaskTable(Connection connection, String[] excludedTaskStatus)