You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2022/10/07 01:11:22 UTC

[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3572: [GOBBLIN-1718] Define DagActionStoreMonitor to listen for kill/resume…

ZihanLi58 commented on code in PR #3572:
URL: https://github.com/apache/gobblin/pull/3572#discussion_r989591115


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -123,46 +93,41 @@ protected void assignTopicPartitions() {
   associated with it), a given message itself will be partitioned and assigned to only one queue.
    */
   protected void processMessage(DecodeableKafkaRecord message) {
-    String specUri = (String) message.getKey();
-    SpecStoreChangeEvent value = (SpecStoreChangeEvent) message.getValue();
+    String key = (String) message.getKey();
+    GenericStoreChangeEvent value = (GenericStoreChangeEvent) message.getValue();
 
     Long timestamp = value.getTimestamp();
     String operation = value.getOperationType().name();
-    log.info("Processing message with specUri is {} timestamp is {} operation is {}", specUri, timestamp, operation);
 
-    // If we've already processed a message with this timestamp and spec uri before then skip duplicate message
-    String changeIdentifier = timestamp.toString() + specUri;
-    if (specChangesSeenCache.getIfPresent(changeIdentifier) != null) {
-      return;
-    }
+    log.debug("Processing message where specUri is {} timestamp is {} operation is {}", key, timestamp, operation);
 
-    // If event is a heartbeat type then log it and skip processing
-    if (operation == "HEARTBEAT") {
-      log.debug("Received heartbeat message from time {}", timestamp);
+    String changeIdentifier = timestamp + key;
+    if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, specChangesSeenCache, operation,
+        timestamp.toString())) {
       return;
     }
 
     Spec spec;
     URI specAsUri = null;
 
     try {
-      specAsUri = new URI(specUri);
+      specAsUri = new URI(key);
     } catch (URISyntaxException e) {
-      if (operation == "DELETE") {
-        log.warn("Could not create URI object for specUri {} due to error {}", specUri, e.getMessage());
+      if (operation.equals("DELETE")) {
+        log.warn("Could not create URI object for specUri {} due to error {}", key, e.getMessage());
         this.unexpectedErrors.mark();
         return;
       }
     }
 
-    spec = (operation != "DELETE") ? this.flowCatalog.getSpecWrapper(specAsUri) : null;
+    spec = (operation.equals("DELETE")) ? this.flowCatalog.getSpecWrapper(specAsUri) : null;

Review Comment:
   you mean !(operation.equals("DELETE")) ?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+
+
+/**
+ * A DagActionStore change monitor that uses {@link DagActionStoreChangeEvent} schema to process Kafka messages received
+ * from its corresponding consumer client. This monitor responds to requests to resume or delete a flow and acts as a
+ * connector between the API and execution layers of GaaS.
+ */
+@Slf4j
+public class DagActionStoreChangeMonitor extends HighLevelConsumer {
+  public static final String DAG_ACTION_CHANGE_MONITOR_PREFIX = "dagActionChangeStore";
+
+  // Metrics
+  ContextAwareMeter killsInvoked;
+  ContextAwareMeter resumesInvoked;
+  ContextAwareMeter unexpectedErrors;
+
+  protected CacheLoader<String, String> cacheLoader = new CacheLoader<String, String>() {
+    @Override
+    public String load(String key) throws Exception {
+      return key;
+    }
+  };
+
+  protected LoadingCache<String, String>
+      dagActionsSeenCache = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.MINUTES).build(cacheLoader);
+
+  @Inject
+  protected DagActionStore dagActionStore;
+
+  @Inject
+  protected DagManager dagManager;
+
+  public DagActionStoreChangeMonitor(String topic, Config config, int numThreads) {
+    // Differentiate group id for each host
+    super(topic, config.withValue(GROUP_ID_KEY,
+        ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX + UUID.randomUUID().toString())),
+        numThreads);
+  }
+
+  @Override
+  protected void assignTopicPartitions() {
+    ChangeMonitorUtils.assignTopicPartitionsHelper(this.topic, this.getGobblinKafkaConsumerClient());
+  }
+
+  @Override
+  /*
+  This class is multi-threaded and this message will be called by multiple threads, however any given message will be
+  partitioned and processed by only one thread (and corresponding queue).
+   */
+  protected void processMessage(DecodeableKafkaRecord message) {
+    String key = (String) message.getKey();
+    DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) message.getValue();
+
+    Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+    String operation = value.getChangeEventIdentifier().getOperationType().name();
+    String flowGroup = value.getFlowGroup();
+    String flowName = value.getFlowName();
+    String flowExecutionId = value.getFlowExecutionId();
+
+    log.debug("Processing Dag Action message for flow group: {} name: {} executionId: {} timestamp {} operation {}",
+        flowGroup, flowName, flowExecutionId, timestamp, operation);
+
+    String changeIdentifier = timestamp + key;
+    if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, dagActionsSeenCache, operation,
+        timestamp.toString())) {
+      return;
+    }
+
+    // retrieve operation type from MySQL table OR from the event itself
+    DagActionStore.DagActionValue dagAction = null;
+    try {
+      dagAction = dagActionStore.getDagAction(flowGroup, flowName, flowExecutionId).getDagActionValue();

Review Comment:
   Again, if the operation is delete, you don't need to do anything here right? why we want get dag even in that case?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -123,46 +93,41 @@ protected void assignTopicPartitions() {
   associated with it), a given message itself will be partitioned and assigned to only one queue.
    */
   protected void processMessage(DecodeableKafkaRecord message) {
-    String specUri = (String) message.getKey();
-    SpecStoreChangeEvent value = (SpecStoreChangeEvent) message.getValue();
+    String key = (String) message.getKey();
+    GenericStoreChangeEvent value = (GenericStoreChangeEvent) message.getValue();
 
     Long timestamp = value.getTimestamp();
     String operation = value.getOperationType().name();
-    log.info("Processing message with specUri is {} timestamp is {} operation is {}", specUri, timestamp, operation);
 
-    // If we've already processed a message with this timestamp and spec uri before then skip duplicate message
-    String changeIdentifier = timestamp.toString() + specUri;
-    if (specChangesSeenCache.getIfPresent(changeIdentifier) != null) {
-      return;
-    }
+    log.debug("Processing message where specUri is {} timestamp is {} operation is {}", key, timestamp, operation);
 
-    // If event is a heartbeat type then log it and skip processing
-    if (operation == "HEARTBEAT") {
-      log.debug("Received heartbeat message from time {}", timestamp);
+    String changeIdentifier = timestamp + key;
+    if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, specChangesSeenCache, operation,
+        timestamp.toString())) {
       return;
     }
 
     Spec spec;
     URI specAsUri = null;
 
     try {
-      specAsUri = new URI(specUri);
+      specAsUri = new URI(key);
     } catch (URISyntaxException e) {
-      if (operation == "DELETE") {
-        log.warn("Could not create URI object for specUri {} due to error {}", specUri, e.getMessage());
+      if (operation.equals("DELETE")) {

Review Comment:
   If it's not delete, shouldn't we log the error and return as well? current implementation seems to fail the service directly?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+
+
+/**
+ * A DagActionStore change monitor that uses {@link DagActionStoreChangeEvent} schema to process Kafka messages received
+ * from its corresponding consumer client. This monitor responds to requests to resume or delete a flow and acts as a
+ * connector between the API and execution layers of GaaS.
+ */
+@Slf4j
+public class DagActionStoreChangeMonitor extends HighLevelConsumer {
+  public static final String DAG_ACTION_CHANGE_MONITOR_PREFIX = "dagActionChangeStore";
+
+  // Metrics
+  ContextAwareMeter killsInvoked;
+  ContextAwareMeter resumesInvoked;
+  ContextAwareMeter unexpectedErrors;
+
+  protected CacheLoader<String, String> cacheLoader = new CacheLoader<String, String>() {
+    @Override
+    public String load(String key) throws Exception {
+      return key;
+    }
+  };
+
+  protected LoadingCache<String, String>
+      dagActionsSeenCache = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.MINUTES).build(cacheLoader);
+
+  @Inject
+  protected DagActionStore dagActionStore;
+
+  @Inject
+  protected DagManager dagManager;
+
+  public DagActionStoreChangeMonitor(String topic, Config config, int numThreads) {
+    // Differentiate group id for each host
+    super(topic, config.withValue(GROUP_ID_KEY,
+        ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX + UUID.randomUUID().toString())),
+        numThreads);
+  }
+
+  @Override
+  protected void assignTopicPartitions() {
+    ChangeMonitorUtils.assignTopicPartitionsHelper(this.topic, this.getGobblinKafkaConsumerClient());
+  }
+
+  @Override
+  /*
+  This class is multi-threaded and this message will be called by multiple threads, however any given message will be
+  partitioned and processed by only one thread (and corresponding queue).
+   */
+  protected void processMessage(DecodeableKafkaRecord message) {

Review Comment:
   I feel you want to add metrics here to indicate we are continually process message and this service is healthy. No need to do in this PR, but at least you want to add todo here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org