You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/01/17 18:10:47 UTC
samza git commit: SAMZA-1523: Cleanup table entries before shutting
down the processor
Repository: samza
Updated Branches:
refs/heads/master fcd2b9a53 -> 95a71fb9e
SAMZA-1523: Cleanup table entries before shutting down the processor
Modified the `TableUtils#deleteProcessorEntity` to provide an option to disable optimistic locking during a call to Azure Table Storage service.
sborya PawasChhokra nickpan47 Review please?
Author: navina <na...@apache.org>
Reviewers: Shanthoosh V<sv...@linkedin.com>, Boris S<bs...@linkedin.com>
Closes #379 from navina/azure-etag-fix
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/95a71fb9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/95a71fb9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/95a71fb9
Branch: refs/heads/master
Commit: 95a71fb9e234c9fc7fcb8a4ec0bbab33c9555738
Parents: fcd2b9a
Author: navina <na...@apache.org>
Authored: Wed Jan 17 10:11:40 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Jan 17 10:11:40 2018 -0800
----------------------------------------------------------------------
.../samza/coordinator/AzureJobCoordinator.java | 75 ++++++++++----------
.../samza/coordinator/data/ProcessorEntity.java | 8 +++
.../scheduler/JMVersionUpgradeScheduler.java | 9 +--
.../java/org/apache/samza/util/TableUtils.java | 49 +++++++++----
4 files changed, 89 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/95a71fb9/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
index 468705b..ca3384d 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
@@ -19,22 +19,10 @@
package org.apache.samza.coordinator;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.AzureClient;
-import org.apache.samza.config.AzureConfig;
-import org.apache.samza.coordinator.data.BarrierState;
import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.AzureConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
@@ -42,8 +30,8 @@ import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper;
import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.runtime.ProcessorIdGenerator;
+import org.apache.samza.coordinator.data.BarrierState;
+import org.apache.samza.coordinator.data.ProcessorEntity;
import org.apache.samza.coordinator.scheduler.HeartbeatScheduler;
import org.apache.samza.coordinator.scheduler.JMVersionUpgradeScheduler;
import org.apache.samza.coordinator.scheduler.LeaderBarrierCompleteScheduler;
@@ -51,6 +39,8 @@ import org.apache.samza.coordinator.scheduler.LeaderLivenessCheckScheduler;
import org.apache.samza.coordinator.scheduler.LivenessCheckScheduler;
import org.apache.samza.coordinator.scheduler.RenewLeaseScheduler;
import org.apache.samza.coordinator.scheduler.SchedulerStateChangeListener;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.runtime.ProcessorIdGenerator;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
@@ -64,6 +54,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
/**
* Class that provides coordination mechanism for Samza standalone in Azure.
@@ -80,7 +82,6 @@ public class AzureJobCoordinator implements JobCoordinator {
private final TableUtils table;
private final Config config;
private final String processorId;
- private final AzureClient client;
private final AtomicReference<String> currentJMVersion;
private final AtomicBoolean versionUpgradeDetected;
private final HeartbeatScheduler heartbeat;
@@ -103,7 +104,7 @@ public class AzureJobCoordinator implements JobCoordinator {
processorId = createProcessorId(config);
currentJMVersion = new AtomicReference<>(INITIAL_STATE);
AzureConfig azureConfig = new AzureConfig(config);
- client = new AzureClient(azureConfig.getAzureConnectionString());
+ AzureClient client = new AzureClient(azureConfig.getAzureConnectionString());
leaderBlob = new BlobUtils(client, azureConfig.getAzureContainerName(), azureConfig.getAzureBlobName(), azureConfig.getAzureBlobLength());
errorHandler = (errorMsg) -> {
LOG.error(errorMsg);
@@ -149,19 +150,18 @@ public class AzureJobCoordinator implements JobCoordinator {
public void stop() {
LOG.info("Shutting down Azure job coordinator.");
- if (coordinatorListener != null) {
- coordinatorListener.onJobModelExpired();
- }
-
- // Resign leadership
- if (azureLeaderElector.amILeader()) {
- azureLeaderElector.resignLeadership();
- }
+ // Clean up resources & Resign leadership (if you are leader)
+ azureLeaderElector.resignLeadership();
+ table.deleteProcessorEntity(currentJMVersion.get(), processorId, true);
// Shutdown all schedulers
shutdownSchedulers();
if (coordinatorListener != null) {
+ coordinatorListener.onJobModelExpired();
+ }
+
+ if (coordinatorListener != null) {
coordinatorListener.onCoordinatorStop();
}
}
@@ -217,7 +217,6 @@ public class AzureJobCoordinator implements JobCoordinator {
if (!leaderBlob.publishBarrierState(state, azureLeaderElector.getLeaseId().get())) {
LOG.info("Leader failed to publish the job model {}. Stopping the processor with PID: .", jobModel, processorId);
stop();
- table.deleteProcessorEntity(currentJMVersion.get(), processorId);
}
leaderBarrierScheduler.shutdown();
};
@@ -374,7 +373,6 @@ public class AzureJobCoordinator implements JobCoordinator {
if (!jmWrite || !barrierWrite || !processorWrite) {
LOG.info("Leader failed to publish the job model {}. Stopping the processor with PID: .", jobModel, processorId);
stop();
- table.deleteProcessorEntity(currentJMVersion.get(), processorId);
}
LOG.info("pid=" + processorId + "Published new Job Model. Version = " + nextJMVersion);
@@ -400,7 +398,6 @@ public class AzureJobCoordinator implements JobCoordinator {
if (!jobModel.getContainers().containsKey(processorId)) {
LOG.info("JobModel: {} does not contain the processorId: {}. Stopping the processor.", jobModel, processorId);
stop();
- table.deleteProcessorEntity(currentJMVersion.get(), processorId);
} else {
//Stop current work
if (coordinatorListener != null) {
@@ -443,18 +440,24 @@ public class AzureJobCoordinator implements JobCoordinator {
private void onNewJobModelConfirmed(final String nextJMVersion) {
LOG.info("pid=" + processorId + "new version " + nextJMVersion + " of the job model got confirmed");
- // Delete previous value
- if (table.getEntity(currentJMVersion.get(), processorId) != null) {
- table.deleteProcessorEntity(currentJMVersion.get(), processorId);
- }
- if (table.getEntity(INITIAL_STATE, processorId) != null) {
- table.deleteProcessorEntity(INITIAL_STATE, processorId);
- }
+ String prevVersion = currentJMVersion.get();
- //Start heartbeating to new entry only when barrier reached.
+ //Start heart-beating to new entry only when barrier reached.
//Changing the current job model version enables that since we are heartbeating to a row identified by the current job model version.
currentJMVersion.getAndSet(nextJMVersion);
+ // Delete previous value
+ ProcessorEntity entity = table.getEntity(prevVersion, processorId);
+ if (entity != null) {
+ entity.setEtag("*");
+ table.deleteProcessorEntity(entity);
+ }
+ entity = table.getEntity(INITIAL_STATE, processorId);
+ if (entity != null) {
+ entity.setEtag("*");
+ table.deleteProcessorEntity(entity);
+ }
+
//Start the container with the new model
if (coordinatorListener != null) {
coordinatorListener.onNewJobModel(processorId, jobModel);
http://git-wip-us.apache.org/repos/asf/samza/blob/95a71fb9/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java b/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java
index 9323bde..27c18b3 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java
@@ -59,4 +59,12 @@ public class ProcessorEntity extends TableServiceEntity {
public boolean getIsLeader() {
return isLeader;
}
+
+ public String getJobModelVersion() {
+ return partitionKey;
+ }
+
+ public String getProcessorId() {
+ return rowKey;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/95a71fb9/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
index ded014f..235b1f8 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
@@ -20,6 +20,11 @@
package org.apache.samza.coordinator.scheduler;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.samza.coordinator.data.BarrierState;
+import org.apache.samza.util.BlobUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -29,10 +34,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
-import org.apache.samza.coordinator.data.BarrierState;
-import org.apache.samza.util.BlobUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/95a71fb9/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java b/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java
index f49ce27..d5b8320 100644
--- a/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java
+++ b/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java
@@ -24,17 +24,17 @@ import com.microsoft.azure.storage.table.CloudTable;
import com.microsoft.azure.storage.table.CloudTableClient;
import com.microsoft.azure.storage.table.TableOperation;
import com.microsoft.azure.storage.table.TableQuery;
-import java.net.URISyntaxException;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.samza.AzureClient;
import org.apache.samza.AzureException;
import org.apache.samza.coordinator.data.ProcessorEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
/**
* Client side class that has a reference to Azure Table Storage.
@@ -44,17 +44,15 @@ import org.slf4j.LoggerFactory;
* ROW KEY = Unique entity ID for a group = Processor ID (for this case).
*/
public class TableUtils {
-
private static final Logger LOG = LoggerFactory.getLogger(TableUtils.class);
private static final String PARTITION_KEY = "PartitionKey";
private static final long LIVENESS_DEBOUNCE_TIME_SEC = 30;
private final String initialState;
- private final CloudTableClient tableClient;
private final CloudTable table;
public TableUtils(AzureClient client, String tableName, String initialState) {
this.initialState = initialState;
- tableClient = client.getTableClient();
+ CloudTableClient tableClient = client.getTableClient();
try {
table = tableClient.getTableReference(tableName);
table.createIfNotExists();
@@ -97,8 +95,7 @@ public class TableUtils {
public ProcessorEntity getEntity(String jmVersion, String pid) {
try {
TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
- ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
- return entity;
+ return table.execute(retrieveEntity).getResultAsType();
} catch (StorageException e) {
LOG.error("Azure storage exception while retrieving processor entity with job model version: " + jmVersion + "and pid: " + pid, e);
throw new AzureException(e);
@@ -112,7 +109,6 @@ public class TableUtils {
*/
public void updateHeartbeat(String jmVersion, String pid) {
try {
- Random rand = new Random();
TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
entity.updateLiveness();
@@ -145,14 +141,24 @@ public class TableUtils {
/**
* Deletes a specified row in the processor table.
+ *
+ * Note: Table service uses optimistic locking by default. Hence, if there is an update after retrieving the entity,
+ * then the delete operation will fail.
+ *
* @param jmVersion Job model version of the processor row to be deleted.
* @param pid Unique processor ID of the processor row to be deleted.
+ * @param force True, to disable optimistic locking on the table. False, otherwise. Setting to false may result in
+ * AzureException when there is concurrent access to the table.
+ *
* @throws AzureException If an Azure storage service error occurred.
*/
- public void deleteProcessorEntity(String jmVersion, String pid) {
+ public void deleteProcessorEntity(String jmVersion, String pid, boolean force) {
try {
TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
+ if (force) {
+ entity.setEtag("*");
+ }
TableOperation remove = TableOperation.delete(entity);
table.execute(remove);
} catch (StorageException e) {
@@ -162,6 +168,25 @@ public class TableUtils {
}
/**
+ * Deletes a specified row in the processor table.
+ *
+ * Note: Table service uses optimistic locking by default. In order to disable it, set the ETag on the ProcessorEntity
+ * to "*" before invoking this method.
+ *
+ * @param entity ProcessorEntity that has to be deleted
+ * @throws AzureException If an Azure storage service error occurred.
+ */
+ public void deleteProcessorEntity(ProcessorEntity entity) {
+ try {
+ TableOperation remove = TableOperation.delete(entity);
+ table.execute(remove);
+ } catch (StorageException e) {
+ LOG.error("Azure storage exception while deleting processor entity with job model version: " +
+ entity.getJobModelVersion() + "and pid: " + entity.getProcessorId(), e);
+ throw new AzureException(e);
+ }
+ }
+ /**
* Retrieve all rows in a table with the given partition key.
* @param partitionKey Job model version of the processors to be retrieved.
* @return Iterable list of processor entities.