You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/07/01 04:27:38 UTC
[pulsar] branch master updated: Functions metadata compaction
(#7377)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3d94553 Functions metadata compaction (#7377)
3d94553 is described below
commit 3d94553fe7f1f6ed17d3e115e1d08a02585c7a77
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Tue Jun 30 21:27:25 2020 -0700
Functions metadata compaction (#7377)
* Function workers re-direct call update requests to the leader
* Fixed test
* tests pass
* Working version
* Fix test
* Short circuit update
* Fix test
* Fix test
* Fix tests
* Added one more catch
* Added one more catch
* Seperated internal and external errors
* Fix test
* Address feedback
* Do not expose updateOnLeader to functions
* hide api
* hide api
* removed duplicate comments
* Do leadership changes in function metadata manager
* make the function sync
* Added more comments
* Throw error
* Changed name
* address comments
* Deleted unused classes
* Rework metadata manager
* Working
* Fix test
* A better way for test
* Address feedback
* Added an option to compact function metadata topic
* Address feedback
* Incorporate feedback
Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
.../pulsar/functions/worker/WorkerConfig.java | 5 +
.../pulsar/functions/utils/FunctionCommon.java | 20 ++++
.../functions/worker/FunctionMetaDataManager.java | 109 +++++++++++++++------
.../worker/FunctionMetaDataTopicTailer.java | 10 +-
.../pulsar/functions/worker/SchedulerManager.java | 21 ++++
.../worker/FunctionMetaDataManagerTest.java | 59 ++++++++++-
6 files changed, 186 insertions(+), 38 deletions(-)
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index bb8d896..c056028 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -169,6 +169,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
)
private String functionMetadataTopicName;
@FieldContext(
+ category = CATEGORY_FUNC_METADATA_MNG,
+ doc = "Should the metadata topic be compacted?"
+ )
+ private Boolean useCompactedMetadataTopic = false;
+ @FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The web service url for function workers"
)
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index b8ac299..5a68855 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -341,6 +341,26 @@ public class FunctionCommon {
return String.format("%s/%s/%s", tenant, namespace, functionName);
}
+ public static String extractTenantFromFullyQualifiedName(String fqfn) {
+ return extractFromFullyQualifiedName(fqfn, 0);
+ }
+
+ public static String extractNamespaceFromFullyQualifiedName(String fqfn) {
+ return extractFromFullyQualifiedName(fqfn, 1);
+ }
+
+ public static String extractNameFromFullyQualifiedName(String fqfn) {
+ return extractFromFullyQualifiedName(fqfn, 2);
+ }
+
+ private static String extractFromFullyQualifiedName(String fqfn, int index) {
+ String[] parts = fqfn.split("/");
+ if (parts.length >= 3) {
+ return parts[index];
+ }
+ throw new RuntimeException("Invalid Fully Qualified Function Name " + fqfn);
+ }
+
public static Class<?> getTypeArg(String className, Class<?> funClass, ClassLoader classLoader)
throws ClassNotFoundException {
Class<?> loadedClass = classLoader.loadClass(className);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 795a735..abe2b38 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.client.api.*;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.Request;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import java.io.IOException;
import java.util.Collection;
@@ -68,7 +69,10 @@ public class FunctionMetaDataManager implements AutoCloseable {
// Note that this variable serves a double duty. A non-null value
// implies we are the leader, while a null value means we are not the leader
private Producer exclusiveLeaderProducer;
- private MessageId lastMessageSeen = MessageId.earliest;
+ @Getter
+ private volatile MessageId lastMessageSeen = MessageId.earliest;
+
+ private static final String versionTag = "version";
@Getter
private CompletableFuture<Void> isInitialized = new CompletableFuture<>();
@@ -206,14 +210,30 @@ public class FunctionMetaDataManager implements AutoCloseable {
} else {
needsScheduling = processUpdate(functionMetaData);
}
- Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
- .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
- .setFunctionMetaData(functionMetaData)
- .setWorkerId(workerConfig.getWorkerId())
- .setRequestId(UUID.randomUUID().toString())
- .build();
+ byte[] toWrite;
+ if (workerConfig.getUseCompactedMetadataTopic()) {
+ if (delete) {
+ toWrite = "".getBytes();
+ } else {
+ toWrite = functionMetaData.toByteArray();
+ }
+ } else {
+ Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+ .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+ .setFunctionMetaData(functionMetaData)
+ .setWorkerId(workerConfig.getWorkerId())
+ .setRequestId(UUID.randomUUID().toString())
+ .build();
+ toWrite = serviceRequest.toByteArray();
+ }
try {
- lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+ TypedMessageBuilder builder = exclusiveLeaderProducer.newMessage()
+ .value(toWrite)
+ .property(versionTag, Long.toString(functionMetaData.getVersion()));
+ if (workerConfig.getUseCompactedMetadataTopic()) {
+ builder = builder.key(FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails()));
+ }
+ lastMessageSeen = builder.send();
} catch (Exception e) {
log.error("Could not write into Function Metadata topic", e);
throw new IllegalStateException("Internal Error updating function at the leader", e);
@@ -290,19 +310,10 @@ public class FunctionMetaDataManager implements AutoCloseable {
*/
public void processMetaDataTopicMessage(Message<byte[]> message) throws IOException {
try {
- Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
- if (log.isDebugEnabled()) {
- log.debug("Received Service Request: {}", serviceRequest);
- }
- switch (serviceRequest.getServiceRequestType()) {
- case UPDATE:
- this.processUpdate(serviceRequest.getFunctionMetaData());
- break;
- case DELETE:
- this.proccessDeregister(serviceRequest.getFunctionMetaData());
- break;
- default:
- log.warn("Received request with unrecognized type: {}", serviceRequest);
+ if (workerConfig.getUseCompactedMetadataTopic()) {
+ processCompactedMetaDataTopicMessage(message);
+ } else {
+ processUncompactedMetaDataTopicMessage(message);
}
} catch (IllegalArgumentException e) {
// Its ok. Nothing much we can do about it
@@ -310,6 +321,37 @@ public class FunctionMetaDataManager implements AutoCloseable {
lastMessageSeen = message.getMessageId();
}
+ private void processUncompactedMetaDataTopicMessage(Message<byte[]> message) throws IOException {
+ Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
+ if (log.isDebugEnabled()) {
+ log.debug("Received Service Request: {}", serviceRequest);
+ }
+ switch (serviceRequest.getServiceRequestType()) {
+ case UPDATE:
+ this.processUpdate(serviceRequest.getFunctionMetaData());
+ break;
+ case DELETE:
+ this.proccessDeregister(serviceRequest.getFunctionMetaData());
+ break;
+ default:
+ log.warn("Received request with unrecognized type: {}", serviceRequest);
+ }
+ }
+
+ private void processCompactedMetaDataTopicMessage(Message<byte[]> message) throws IOException {
+ long version = Long.valueOf(message.getProperty(versionTag));
+ String tenant = FunctionCommon.extractTenantFromFullyQualifiedName(message.getKey());
+ String namespace = FunctionCommon.extractNamespaceFromFullyQualifiedName(message.getKey());
+ String functionName = FunctionCommon.extractNameFromFullyQualifiedName(message.getKey());
+ if (message.getData() == null || message.getData().length == 0) {
+ // this is a delete message
+ this.proccessDeregister(tenant, namespace, functionName, version);
+ } else {
+ FunctionMetaData functionMetaData = FunctionMetaData.parseFrom(message.getData());
+ this.processUpdate(functionMetaData);
+ }
+ }
+
/**
* Private methods for internal use. Should not be used outside of this class
*/
@@ -336,25 +378,29 @@ public class FunctionMetaDataManager implements AutoCloseable {
@VisibleForTesting
synchronized boolean proccessDeregister(FunctionMetaData deregisterRequestFs) throws IllegalArgumentException {
-
String functionName = deregisterRequestFs.getFunctionDetails().getName();
String tenant = deregisterRequestFs.getFunctionDetails().getTenant();
String namespace = deregisterRequestFs.getFunctionDetails().getNamespace();
+ return proccessDeregister(tenant, namespace, functionName, deregisterRequestFs.getVersion());
+ }
+
+ synchronized boolean proccessDeregister(String tenant, String namespace,
+ String functionName, long version) throws IllegalArgumentException {
boolean needsScheduling = false;
- log.debug("Process deregister request: {}", deregisterRequestFs);
+ log.debug("Process deregister request: {}/{}/{}/{}", tenant, namespace, functionName, version);
// Check if we still have this function. Maybe already deleted by someone else
- if (this.containsFunctionMetaData(deregisterRequestFs)) {
+ if (this.containsFunctionMetaData(tenant, namespace, functionName)) {
// check if request is outdated
- if (!isRequestOutdated(deregisterRequestFs)) {
+ if (!isRequestOutdated(tenant, namespace, functionName, version)) {
this.functionMetaDataMap.get(tenant).get(namespace).remove(functionName);
needsScheduling = true;
} else {
if (log.isDebugEnabled()) {
log.debug("{}/{}/{} Ignoring outdated request version: {}", tenant, namespace, functionName,
- deregisterRequestFs.getVersion());
+ version);
}
throw new IllegalArgumentException("Delete request ignored because it is out of date. Please try again.");
}
@@ -393,9 +439,14 @@ public class FunctionMetaDataManager implements AutoCloseable {
private boolean isRequestOutdated(FunctionMetaData requestFunctionMetaData) {
Function.FunctionDetails functionDetails = requestFunctionMetaData.getFunctionDetails();
- FunctionMetaData currentFunctionMetaData = this.functionMetaDataMap.get(functionDetails.getTenant())
- .get(functionDetails.getNamespace()).get(functionDetails.getName());
- return currentFunctionMetaData.getVersion() >= requestFunctionMetaData.getVersion();
+ return isRequestOutdated(functionDetails.getTenant(), functionDetails.getNamespace(),
+ functionDetails.getName(), requestFunctionMetaData.getVersion());
+ }
+
+ private boolean isRequestOutdated(String tenant, String namespace, String functionName, long version) {
+ FunctionMetaData currentFunctionMetaData = this.functionMetaDataMap.get(tenant)
+ .get(namespace).get(functionName);
+ return currentFunctionMetaData.getVersion() >= version;
}
@VisibleForTesting
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
index 8b51971..0974835 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
@@ -29,7 +29,6 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.functions.proto.Request.ServiceRequest;
@Slf4j
public class FunctionMetaDataTopicTailer
@@ -131,11 +130,14 @@ public class FunctionMetaDataTopicTailer
public static Reader createReader(WorkerConfig workerConfig, ReaderBuilder readerBuilder,
MessageId startMessageId) throws PulsarClientException {
- return readerBuilder
+ ReaderBuilder builder = readerBuilder
.topic(workerConfig.getFunctionMetadataTopic())
.startMessageId(startMessageId)
.readerName(workerConfig.getWorkerId() + "-function-metadata-tailer")
- .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-metadata-tailer")
- .create();
+ .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-metadata-tailer");
+ if (workerConfig.getUseCompactedMetadataTopic()) {
+ builder = builder.readCompacted(true);
+ }
+ return builder.create();
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 0b8eeb2..0cd682a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -112,6 +112,8 @@ public class SchedulerManager implements AutoCloseable {
@Getter
private MessageId lastMessageProduced = null;
+ private MessageId metadataTopicLastMessage = MessageId.earliest;
+
public SchedulerManager(WorkerConfig workerConfig,
PulsarClient pulsarClient,
PulsarAdmin admin,
@@ -224,6 +226,13 @@ public class SchedulerManager implements AutoCloseable {
isCompactionNeeded.set(false);
}
}, scheduleFrequencySec, scheduleFrequencySec, TimeUnit.SECONDS);
+
+ executor.scheduleWithFixedDelay(() -> {
+ if (leaderService.isLeader() && metadataTopicLastMessage.compareTo(functionMetaDataManager.getLastMessageSeen()) != 0) {
+ metadataTopicLastMessage = functionMetaDataManager.getLastMessageSeen();
+ compactFunctionMetadataTopic();
+ }
+ }, scheduleFrequencySec, scheduleFrequencySec, TimeUnit.SECONDS);
}
}
@@ -337,6 +346,18 @@ public class SchedulerManager implements AutoCloseable {
}
}
+ private void compactFunctionMetadataTopic() {
+ if (this.admin != null) {
+ try {
+ this.admin.topics().triggerCompaction(workerConfig.getFunctionMetadataTopic());
+ } catch (PulsarAdminException e) {
+ log.error("Failed to trigger compaction", e);
+ scheduledExecutorService.schedule(() -> compactFunctionMetadataTopic(), DEFAULT_ADMIN_API_BACKOFF_SEC,
+ TimeUnit.SECONDS);
+ }
+ }
+ }
+
private MessageId publishNewAssignment(Assignment assignment, boolean deleted) {
try {
String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index b5fb0df..25e62f0 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.*;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -34,12 +35,25 @@ import org.testng.annotations.Test;
public class FunctionMetaDataManagerTest {
+ static byte[] producerByteArray;
+
private static PulsarClient mockPulsarClient() throws PulsarClientException {
ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
when(builder.topic(anyString())).thenReturn(builder);
when(builder.producerName(anyString())).thenReturn(builder);
- when(builder.create()).thenReturn(mock(Producer.class));
+ Producer producer = mock(Producer.class);
+ TypedMessageBuilder messageBuilder = mock(TypedMessageBuilder.class);
+ when(messageBuilder.key(anyString())).thenReturn(messageBuilder);
+ doAnswer(invocation -> {
+ Object arg0 = invocation.getArgument(0);
+ FunctionMetaDataManagerTest.producerByteArray = (byte[])arg0;
+ return messageBuilder;
+ }).when(messageBuilder).value(any());
+ when(messageBuilder.property(anyString(), anyString())).thenReturn(messageBuilder);
+ when(producer.newMessage()).thenReturn(messageBuilder);
+
+ when(builder.create()).thenReturn(producer);
PulsarClient client = mock(PulsarClient.class);
when(client.newProducer()).thenReturn(builder);
@@ -86,10 +100,20 @@ public class FunctionMetaDataManagerTest {
}
@Test
- public void testUpdateIfLeaderFunction() throws PulsarClientException {
+ public void testUpdateIfLeaderFunctionWithoutCompaction() throws PulsarClientException {
+ testUpdateIfLeaderFunction(false);
+ }
+
+ @Test
+ public void testUpdateIfLeaderFunctionWithCompaction() throws PulsarClientException {
+ testUpdateIfLeaderFunction(true);
+ }
+
+ private void testUpdateIfLeaderFunction(boolean compact) throws PulsarClientException {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
+ workerConfig.setUseCompactedMetadataTopic(compact);
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
@@ -110,6 +134,11 @@ public class FunctionMetaDataManagerTest {
functionMetaDataManager.acquireLeadership();
// Now w should be able to really update
functionMetaDataManager.updateFunctionOnLeader(m1, false);
+ if (compact) {
+ Assert.assertTrue(Arrays.equals(m1.toByteArray(), producerByteArray));
+ } else {
+ Assert.assertFalse(Arrays.equals(m1.toByteArray(), producerByteArray));
+ }
// outdated request
try {
@@ -119,15 +148,30 @@ public class FunctionMetaDataManagerTest {
Assert.assertEquals(e.getMessage(), "Update request ignored because it is out of date. Please try again.");
}
// udpate with new version
- m1 = m1.toBuilder().setVersion(2).build();
- functionMetaDataManager.updateFunctionOnLeader(m1, false);
+ Function.FunctionMetaData m2 = m1.toBuilder().setVersion(2).build();
+ functionMetaDataManager.updateFunctionOnLeader(m2, false);
+ if (compact) {
+ Assert.assertTrue(Arrays.equals(m2.toByteArray(), producerByteArray));
+ } else {
+ Assert.assertFalse(Arrays.equals(m2.toByteArray(), producerByteArray));
+ }
+ }
+
+ @Test
+ public void deregisterFunctionWithoutCompaction() throws PulsarClientException {
+ deregisterFunction(false);
}
@Test
- public void deregisterFunction() throws PulsarClientException {
+ public void deregisterFunctionWithCompaction() throws PulsarClientException {
+ deregisterFunction(true);
+ }
+
+ private void deregisterFunction(boolean compact) throws PulsarClientException {
SchedulerManager mockedScheduler = mock(SchedulerManager.class);
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
+ workerConfig.setUseCompactedMetadataTopic(compact);
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mockedScheduler,
@@ -170,6 +214,11 @@ public class FunctionMetaDataManagerTest {
m1 = m1.toBuilder().setVersion(2).build();
functionMetaDataManager.updateFunctionOnLeader(m1, true);
verify(mockedScheduler, times(2)).schedule();
+ if (compact) {
+ Assert.assertTrue(Arrays.equals("".getBytes(), producerByteArray));
+ } else {
+ Assert.assertFalse(Arrays.equals(m1.toByteArray(), producerByteArray));
+ }
}
@Test