You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/01/17 18:10:47 UTC

samza git commit: SAMZA-1523: Cleanup table entries before shutting down the processor

Repository: samza
Updated Branches:
  refs/heads/master fcd2b9a53 -> 95a71fb9e


SAMZA-1523: Cleanup table entries before shutting down the processor

Modified the `TableUtils#deleteProcessorEntity` to provide an option to disable optimistic locking during a call to Azure Table Storage service.

sborya PawasChhokra nickpan47   Review please?

Author: navina <na...@apache.org>

Reviewers: Shanthoosh V<sv...@linkedin.com>, Boris S<bs...@linkedin.com>

Closes #379 from navina/azure-etag-fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/95a71fb9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/95a71fb9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/95a71fb9

Branch: refs/heads/master
Commit: 95a71fb9e234c9fc7fcb8a4ec0bbab33c9555738
Parents: fcd2b9a
Author: navina <na...@apache.org>
Authored: Wed Jan 17 10:11:40 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Jan 17 10:11:40 2018 -0800

----------------------------------------------------------------------
 .../samza/coordinator/AzureJobCoordinator.java  | 75 ++++++++++----------
 .../samza/coordinator/data/ProcessorEntity.java |  8 +++
 .../scheduler/JMVersionUpgradeScheduler.java    |  9 +--
 .../java/org/apache/samza/util/TableUtils.java  | 49 +++++++++----
 4 files changed, 89 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/95a71fb9/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
index 468705b..ca3384d 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
@@ -19,22 +19,10 @@
 
 package org.apache.samza.coordinator;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.AzureClient;
-import org.apache.samza.config.AzureConfig;
-import org.apache.samza.coordinator.data.BarrierState;
 import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.AzureConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
@@ -42,8 +30,8 @@ import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper;
 import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.runtime.ProcessorIdGenerator;
+import org.apache.samza.coordinator.data.BarrierState;
+import org.apache.samza.coordinator.data.ProcessorEntity;
 import org.apache.samza.coordinator.scheduler.HeartbeatScheduler;
 import org.apache.samza.coordinator.scheduler.JMVersionUpgradeScheduler;
 import org.apache.samza.coordinator.scheduler.LeaderBarrierCompleteScheduler;
@@ -51,6 +39,8 @@ import org.apache.samza.coordinator.scheduler.LeaderLivenessCheckScheduler;
 import org.apache.samza.coordinator.scheduler.LivenessCheckScheduler;
 import org.apache.samza.coordinator.scheduler.RenewLeaseScheduler;
 import org.apache.samza.coordinator.scheduler.SchedulerStateChangeListener;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.runtime.ProcessorIdGenerator;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
@@ -64,6 +54,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.collection.JavaConverters;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 
 /**
  * Class that provides coordination mechanism for Samza standalone in Azure.
@@ -80,7 +82,6 @@ public class AzureJobCoordinator implements JobCoordinator {
   private final TableUtils table;
   private final Config config;
   private final String processorId;
-  private final AzureClient client;
   private final AtomicReference<String> currentJMVersion;
   private final AtomicBoolean versionUpgradeDetected;
   private final HeartbeatScheduler heartbeat;
@@ -103,7 +104,7 @@ public class AzureJobCoordinator implements JobCoordinator {
     processorId = createProcessorId(config);
     currentJMVersion = new AtomicReference<>(INITIAL_STATE);
     AzureConfig azureConfig = new AzureConfig(config);
-    client = new AzureClient(azureConfig.getAzureConnectionString());
+    AzureClient client = new AzureClient(azureConfig.getAzureConnectionString());
     leaderBlob = new BlobUtils(client, azureConfig.getAzureContainerName(), azureConfig.getAzureBlobName(), azureConfig.getAzureBlobLength());
     errorHandler = (errorMsg) -> {
       LOG.error(errorMsg);
@@ -149,19 +150,18 @@ public class AzureJobCoordinator implements JobCoordinator {
   public void stop() {
     LOG.info("Shutting down Azure job coordinator.");
 
-    if (coordinatorListener != null) {
-      coordinatorListener.onJobModelExpired();
-    }
-
-    // Resign leadership
-    if (azureLeaderElector.amILeader()) {
-      azureLeaderElector.resignLeadership();
-    }
+    // Clean up resources & Resign leadership (if you are leader)
+    azureLeaderElector.resignLeadership();
+    table.deleteProcessorEntity(currentJMVersion.get(), processorId, true);
 
     // Shutdown all schedulers
     shutdownSchedulers();
 
     if (coordinatorListener != null) {
+      coordinatorListener.onJobModelExpired();
+    }
+
+    if (coordinatorListener != null) {
       coordinatorListener.onCoordinatorStop();
     }
   }
@@ -217,7 +217,6 @@ public class AzureJobCoordinator implements JobCoordinator {
       if (!leaderBlob.publishBarrierState(state, azureLeaderElector.getLeaseId().get())) {
         LOG.info("Leader failed to publish the job model {}. Stopping the processor with PID: .", jobModel, processorId);
         stop();
-        table.deleteProcessorEntity(currentJMVersion.get(), processorId);
       }
       leaderBarrierScheduler.shutdown();
     };
@@ -374,7 +373,6 @@ public class AzureJobCoordinator implements JobCoordinator {
     if (!jmWrite || !barrierWrite || !processorWrite) {
       LOG.info("Leader failed to publish the job model {}. Stopping the processor with PID: .", jobModel, processorId);
       stop();
-      table.deleteProcessorEntity(currentJMVersion.get(), processorId);
     }
 
     LOG.info("pid=" + processorId + "Published new Job Model. Version = " + nextJMVersion);
@@ -400,7 +398,6 @@ public class AzureJobCoordinator implements JobCoordinator {
     if (!jobModel.getContainers().containsKey(processorId)) {
       LOG.info("JobModel: {} does not contain the processorId: {}. Stopping the processor.", jobModel, processorId);
       stop();
-      table.deleteProcessorEntity(currentJMVersion.get(), processorId);
     } else {
       //Stop current work
       if (coordinatorListener != null) {
@@ -443,18 +440,24 @@ public class AzureJobCoordinator implements JobCoordinator {
   private void onNewJobModelConfirmed(final String nextJMVersion) {
     LOG.info("pid=" + processorId + "new version " + nextJMVersion + " of the job model got confirmed");
 
-    // Delete previous value
-    if (table.getEntity(currentJMVersion.get(), processorId) != null) {
-      table.deleteProcessorEntity(currentJMVersion.get(), processorId);
-    }
-    if (table.getEntity(INITIAL_STATE, processorId) != null) {
-      table.deleteProcessorEntity(INITIAL_STATE, processorId);
-    }
+    String prevVersion = currentJMVersion.get();
 
-    //Start heartbeating to new entry only when barrier reached.
+    //Start heart-beating to new entry only when barrier reached.
     //Changing the current job model version enables that since we are heartbeating to a row identified by the current job model version.
     currentJMVersion.getAndSet(nextJMVersion);
 
+    // Delete previous value
+    ProcessorEntity entity = table.getEntity(prevVersion, processorId);
+    if (entity != null) {
+      entity.setEtag("*");
+      table.deleteProcessorEntity(entity);
+    }
+    entity = table.getEntity(INITIAL_STATE, processorId);
+    if (entity != null) {
+      entity.setEtag("*");
+      table.deleteProcessorEntity(entity);
+    }
+
     //Start the container with the new model
     if (coordinatorListener != null) {
       coordinatorListener.onNewJobModel(processorId, jobModel);

http://git-wip-us.apache.org/repos/asf/samza/blob/95a71fb9/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java b/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java
index 9323bde..27c18b3 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java
@@ -59,4 +59,12 @@ public class ProcessorEntity extends TableServiceEntity {
   public boolean getIsLeader() {
     return isLeader;
   }
+
+  public String getJobModelVersion() {
+    return partitionKey;
+  }
+
+  public String getProcessorId() {
+    return rowKey;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/95a71fb9/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
index ded014f..235b1f8 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
@@ -20,6 +20,11 @@
 package org.apache.samza.coordinator.scheduler;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.samza.coordinator.data.BarrierState;
+import org.apache.samza.util.BlobUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -29,10 +34,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
-import org.apache.samza.coordinator.data.BarrierState;
-import org.apache.samza.util.BlobUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/samza/blob/95a71fb9/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java b/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java
index f49ce27..d5b8320 100644
--- a/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java
+++ b/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java
@@ -24,17 +24,17 @@ import com.microsoft.azure.storage.table.CloudTable;
 import com.microsoft.azure.storage.table.CloudTableClient;
 import com.microsoft.azure.storage.table.TableOperation;
 import com.microsoft.azure.storage.table.TableQuery;
-import java.net.URISyntaxException;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
 import org.apache.samza.AzureClient;
 import org.apache.samza.AzureException;
 import org.apache.samza.coordinator.data.ProcessorEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
 
 /**
  *  Client side class that has a reference to Azure Table Storage.
@@ -44,17 +44,15 @@ import org.slf4j.LoggerFactory;
  *  ROW KEY = Unique entity ID for a group = Processor ID (for this case).
  */
 public class TableUtils {
-
   private static final Logger LOG = LoggerFactory.getLogger(TableUtils.class);
   private static final String PARTITION_KEY = "PartitionKey";
   private static final long LIVENESS_DEBOUNCE_TIME_SEC = 30;
   private final String initialState;
-  private final CloudTableClient tableClient;
   private final CloudTable table;
 
   public TableUtils(AzureClient client, String tableName, String initialState) {
     this.initialState = initialState;
-    tableClient = client.getTableClient();
+    CloudTableClient tableClient = client.getTableClient();
     try {
       table = tableClient.getTableReference(tableName);
       table.createIfNotExists();
@@ -97,8 +95,7 @@ public class TableUtils {
   public ProcessorEntity getEntity(String jmVersion, String pid) {
     try {
       TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
-      ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
-      return entity;
+      return table.execute(retrieveEntity).getResultAsType();
     } catch (StorageException e) {
       LOG.error("Azure storage exception while retrieving processor entity with job model version: " + jmVersion + "and pid: " + pid, e);
       throw new AzureException(e);
@@ -112,7 +109,6 @@ public class TableUtils {
    */
   public void updateHeartbeat(String jmVersion, String pid) {
     try {
-      Random rand = new Random();
       TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
       ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
       entity.updateLiveness();
@@ -145,14 +141,24 @@ public class TableUtils {
 
   /**
    * Deletes a specified row in the processor table.
+   *
+   * Note: Table service uses optimistic locking by default. Hence, if there is an update after retrieving the entity,
+   * then the delete operation will fail.
+   *
    * @param jmVersion Job model version of the processor row to be deleted.
    * @param pid Unique processor ID of the processor row to be deleted.
+   * @param force True, to disable optimistic locking on the table. False, otherwise. Setting to false may result in
+   *              AzureException when there is concurrent access to the table.
+   *
    * @throws AzureException If an Azure storage service error occurred.
    */
-  public void deleteProcessorEntity(String jmVersion, String pid) {
+  public void deleteProcessorEntity(String jmVersion, String pid, boolean force) {
     try {
       TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
       ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
+      if (force) {
+        entity.setEtag("*");
+      }
       TableOperation remove = TableOperation.delete(entity);
       table.execute(remove);
     } catch (StorageException e) {
@@ -162,6 +168,25 @@ public class TableUtils {
   }
 
   /**
+   * Deletes a specified row in the processor table.
+   *
+   * Note: Table service uses optimistic locking by default. In order to disable it, set the ETag on the ProcessorEntity
+   * to "*" before invoking this method.
+   *
+   * @param entity ProcessorEntity that has to be deleted
+   * @throws AzureException If an Azure storage service error occurred.
+   */
+  public void deleteProcessorEntity(ProcessorEntity entity) {
+    try {
+      TableOperation remove = TableOperation.delete(entity);
+      table.execute(remove);
+    } catch (StorageException e) {
+      LOG.error("Azure storage exception while deleting processor entity with job model version: " +
+          entity.getJobModelVersion() + "and pid: " + entity.getProcessorId(), e);
+      throw new AzureException(e);
+    }
+  }
+  /**
    * Retrieve all rows in a table with the given partition key.
    * @param partitionKey Job model version of the processors to be retrieved.
    * @return Iterable list of processor entities.