You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/07/24 16:01:58 UTC
samza git commit: SAMZA-1773: Side inputs for local stores
Repository: samza
Updated Branches:
refs/heads/master 9f8d593c4 -> fa56b15dc
SAMZA-1773: Side inputs for local stores
prateekm vjagadish
Please take a look.
I will update the PR with the unit tests for SideInputStorageManager and the util functions.
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Author: Prateek Maheshwari <pm...@apache.org>
Author: Prateek Maheshwari <pm...@pmaheshw-mn2.linkedin.biz>
Reviewers: Cameron Lee <ca...@linkedin.com>, Jagadish Venkatraman <vj...@gmail.com>, Shanthoosh Venkatraman <sv...@linkedin.com>
Closes #570 from bharathkk/side-input-v3
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fa56b15d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fa56b15d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fa56b15d
Branch: refs/heads/master
Commit: fa56b15dc55f006cda57eec7d8dfc999db8b94fb
Parents: 9f8d593
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Authored: Tue Jul 24 09:01:09 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Tue Jul 24 09:01:09 2018 -0700
----------------------------------------------------------------------
.../samza/storage/SideInputsProcessor.java | 46 +++
.../storage/SideInputsProcessorFactory.java | 45 +++
.../java/org/apache/samza/table/TableSpec.java | 44 ++-
.../apache/samza/config/JavaStorageConfig.java | 47 +++
.../apache/samza/container/TaskContextImpl.java | 15 +-
.../org/apache/samza/execution/JobNode.java | 20 +-
.../samza/storage/StorageManagerUtil.java | 138 +++++++
.../storage/TaskSideInputStorageManager.java | 379 +++++++++++++++++++
.../org/apache/samza/config/StorageConfig.scala | 12 +
.../org/apache/samza/config/StreamConfig.scala | 2 +-
.../apache/samza/container/SamzaContainer.scala | 66 +++-
.../apache/samza/container/TaskInstance.scala | 130 +++++--
.../samza/storage/TaskStorageManager.scala | 96 +----
.../scala/org/apache/samza/util/FileUtil.scala | 8 +-
.../org/apache/samza/util/ScalaJavaUtil.scala | 12 +
.../main/scala/org/apache/samza/util/Util.scala | 20 +
.../org/apache/samza/task/TestAsyncRunLoop.java | 2 +-
.../samza/container/TestTaskInstance.scala | 5 +-
.../samza/storage/TestTaskStorageManager.scala | 25 +-
.../kv/inmemory/InMemoryTableDescriptor.java | 3 +-
.../storage/kv/RocksDbTableDescriptor.java | 3 +-
.../kv/BaseLocalStoreBackedTableDescriptor.java | 21 +
.../kv/BaseLocalStoreBackedTableProvider.java | 12 +
23 files changed, 983 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessor.java b/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessor.java
new file mode 100644
index 0000000..d2dcac5
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.Serializable;
+import java.util.Collection;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+
+
+/**
+ * The processing logic for store side inputs. Accepts incoming messages from side input streams
+ * and the current store contents, and returns the new key-value entries to be written to the store.
+ */
+@FunctionalInterface
+@InterfaceStability.Unstable
+public interface SideInputsProcessor extends Serializable {
+
+ /**
+ * Process the incoming side input message for the {@code store}.
+ *
+ * @param message incoming message envelope
+ * @param store the store associated with the incoming message envelope
+ * @return a {@link Collection} of {@link Entry}s that will be written to the {@code store}.
+ */
+ Collection<Entry<?, ?>> process(IncomingMessageEnvelope message, KeyValueStore store);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessorFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessorFactory.java b/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessorFactory.java
new file mode 100644
index 0000000..fb7fc2c
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.Serializable;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+
+/**
+ * A factory to build {@link SideInputsProcessor}s.
+ *
+ * Implementations should return a new instance for every invocation of
+ * {@link #getSideInputsProcessor(Config, MetricsRegistry)}
+ */
+@FunctionalInterface
+@InterfaceStability.Unstable
+public interface SideInputsProcessorFactory extends Serializable {
+ /**
+ * Creates a new instance of a {@link SideInputsProcessor}.
+ *
+ * @param config the configuration
+ * @param metricsRegistry the metrics registry
+ * @return an instance of {@link SideInputsProcessor}
+ */
+ SideInputsProcessor getSideInputsProcessor(Config config, MetricsRegistry metricsRegistry);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
index ba57c2f..82577c4 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
@@ -21,10 +21,12 @@ package org.apache.samza.table;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.storage.SideInputsProcessor;
/**
@@ -52,6 +54,8 @@ public class TableSpec implements Serializable {
* once during startup in SamzaContainer. They don't need to be deserialized here on a per-task basis
*/
private transient final KVSerde serde;
+ private transient final List<String> sideInputs;
+ private transient final SideInputsProcessor sideInputsProcessor;
private transient final Map<String, String> config = new HashMap<>();
/**
@@ -61,6 +65,8 @@ public class TableSpec implements Serializable {
this.id = null;
this.serde = null;
this.tableProviderFactoryClassName = null;
+ this.sideInputs = null;
+ this.sideInputsProcessor = null;
}
/**
@@ -71,12 +77,28 @@ public class TableSpec implements Serializable {
* @param serde the serde
* @param config implementation specific configuration
*/
- public TableSpec(String tableId, KVSerde serde, String tableProviderFactoryClassName,
- Map<String, String> config) {
+ public TableSpec(String tableId, KVSerde serde, String tableProviderFactoryClassName, Map<String, String> config) {
+ this(tableId, serde, tableProviderFactoryClassName, config, Collections.emptyList(), null);
+ }
+
+ /**
+ * Constructs a {@link TableSpec}
+ *
+ * @param tableId Id of the table
+ * @param tableProviderFactoryClassName table provider factory
+ * @param serde the serde
+ * @param config implementation specific configuration
+ * @param sideInputs list of side inputs for the table
+ * @param sideInputsProcessor side input processor for the table
+ */
+ public TableSpec(String tableId, KVSerde serde, String tableProviderFactoryClassName, Map<String, String> config,
+ List<String> sideInputs, SideInputsProcessor sideInputsProcessor) {
this.id = tableId;
this.serde = serde;
this.tableProviderFactoryClassName = tableProviderFactoryClassName;
this.config.putAll(config);
+ this.sideInputs = sideInputs;
+ this.sideInputsProcessor = sideInputsProcessor;
}
/**
@@ -113,6 +135,24 @@ public class TableSpec implements Serializable {
return Collections.unmodifiableMap(config);
}
+ /**
+ * Get the list of side inputs for the table.
+ *
+ * @return a {@link List} of side input streams
+ */
+ public List<String> getSideInputs() {
+ return sideInputs;
+ }
+
+ /**
+ * Get the {@link SideInputsProcessor} associated with the table.
+ *
+ * @return a {@link SideInputsProcessor}
+ */
+ public SideInputsProcessor getSideInputsProcessor() {
+ return sideInputsProcessor;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
index bbf0ccf..1e3dbff 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
@@ -20,7 +20,11 @@
package org.apache.samza.config;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.execution.StreamManager;
@@ -43,6 +47,10 @@ public class JavaStorageConfig extends MapConfig {
private static final String ACCESSLOG_ENABLED = "stores.%s.accesslog.enabled";
private static final int DEFAULT_ACCESSLOG_SAMPLING_RATIO = 50;
+ public static final String SIDE_INPUTS = "stores.%s.side.inputs";
+ public static final String SIDE_INPUTS_PROCESSOR_FACTORY = "stores.%s.side.inputs.processor.factory";
+ public static final String SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE = "stores.%s.side.inputs.processor.serialized.instance";
+
public JavaStorageConfig(Config config) {
super(config);
}
@@ -126,4 +134,43 @@ public class JavaStorageConfig extends MapConfig {
public String getChangelogSystem() {
return get(CHANGELOG_SYSTEM, get(JobConfig.JOB_DEFAULT_SYSTEM(), null));
}
+
+ /**
+ * Gets the side inputs for the store. A store can have multiple side input streams which can be
+ * provided as a comma separated list.
+ *
+ * Each side input must either be a {@code streamId}, or of the format {@code systemName.streamName}.
+ * E.g. {@code stores.storeName.side.inputs = kafka.topicA, mySystem.topicB}
+ *
+ * @param storeName name of the store
+ * @return a list of side input streams for the store, or an empty list if it has none.
+ */
+ public List<String> getSideInputs(String storeName) {
+ return Optional.ofNullable(get(String.format(SIDE_INPUTS, storeName), null))
+ .map(inputs -> Stream.of(inputs.split(","))
+ .map(String::trim)
+ .filter(input -> !input.isEmpty())
+ .collect(Collectors.toList()))
+ .orElse(Collections.emptyList());
+ }
+
+ /**
+ * Gets the SideInputsProcessorFactory associated with the {@code storeName}.
+ *
+ * @param storeName name of the store
+ * @return the class name of SideInputsProcessorFactory if present, null otherwise
+ */
+ public String getSideInputsProcessorFactory(String storeName) {
+ return get(String.format(SIDE_INPUTS_PROCESSOR_FACTORY, storeName), null);
+ }
+
+ /**
+ * Gets the serialized instance of SideInputsProcessor associated with the {@code storeName}.
+ *
+ * @param storeName name of the store
+ * @return the serialized instance of SideInputsProcessor if present, null otherwise
+ */
+ public String getSideInputsProcessorSerializedInstance(String storeName) {
+ return get(String.format(SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE, storeName), null);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
index 0d76a33..d65be4c 100644
--- a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
@@ -20,10 +20,10 @@
package org.apache.samza.container;
import com.google.common.collect.ImmutableSet;
+import java.util.function.Function;
import org.apache.samza.checkpoint.OffsetManager;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.ReadableMetricsRegistry;
-import org.apache.samza.storage.TaskStorageManager;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemStreamPartition;
@@ -48,7 +48,7 @@ public class TaskContextImpl implements TaskContext {
private final SamzaContainerContext containerContext;
private final Set<SystemStreamPartition> systemStreamPartitions;
private final OffsetManager offsetManager;
- private final TaskStorageManager storageManager;
+ private final Function<String, KeyValueStore> kvStoreSupplier;
private final TableManager tableManager;
private final JobModel jobModel;
private final StreamMetadataCache streamMetadataCache;
@@ -62,7 +62,7 @@ public class TaskContextImpl implements TaskContext {
SamzaContainerContext containerContext,
Set<SystemStreamPartition> systemStreamPartitions,
OffsetManager offsetManager,
- TaskStorageManager storageManager,
+ Function<String, KeyValueStore> kvStoreSupplier,
TableManager tableManager,
JobModel jobModel,
StreamMetadataCache streamMetadataCache,
@@ -72,7 +72,7 @@ public class TaskContextImpl implements TaskContext {
this.containerContext = containerContext;
this.systemStreamPartitions = ImmutableSet.copyOf(systemStreamPartitions);
this.offsetManager = offsetManager;
- this.storageManager = storageManager;
+ this.kvStoreSupplier = kvStoreSupplier;
this.tableManager = tableManager;
this.jobModel = jobModel;
this.streamMetadataCache = streamMetadataCache;
@@ -91,12 +91,11 @@ public class TaskContextImpl implements TaskContext {
@Override
public KeyValueStore getStore(String storeName) {
- if (storageManager != null) {
- return (KeyValueStore) storageManager.apply(storeName);
- } else {
+ KeyValueStore store = kvStoreSupplier.apply(storeName);
+ if (store == null) {
LOG.warn("No store found for name: {}", storeName);
- return null;
}
+ return store;
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index 6507996..288b1a1 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -140,7 +140,7 @@ public class JobNode {
inputs.add(formattedSystemStream);
}
}
- configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
+
if (!broadcasts.isEmpty()) {
// TODO: remove this once we support defining broadcast input stream in high-level
// task.broadcast.input should be generated by the planner in the future.
@@ -179,6 +179,24 @@ public class JobNode {
configs.putAll(TableConfigGenerator.generateConfigsForTableSpecs(tables));
+ // Add side inputs to the inputs and mark the stream as bootstrap
+ tables.forEach(tableSpec -> {
+ List<String> sideInputs = tableSpec.getSideInputs();
+ if (sideInputs != null && !sideInputs.isEmpty()) {
+ sideInputs.stream()
+ .map(sideInput -> Util.getSystemStreamFromNameOrId(config, sideInput))
+ .forEach(systemStream -> {
+ inputs.add(Util.getNameFromSystemStream(systemStream));
+ configs.put(String.format(StreamConfig.STREAM_PREFIX() + StreamConfig.BOOTSTRAP(),
+ systemStream.getSystem(), systemStream.getStream()), "true");
+ });
+ }
+ });
+
+ configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
+
+ log.info("Job {} has generated configs {}", jobName, configs);
+
String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);
// Disallow user specified job inputs/outputs. This info comes strictly from the user application.
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
new file mode 100644
index 0000000..731a84d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.FileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class StorageManagerUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(StorageManagerUtil.class);
+
+ /**
+ * Fetch the starting offset for the input {@link SystemStreamPartition}
+ *
+ * Note: The method doesn't respect {@link org.apache.samza.config.StreamConfig#CONSUMER_OFFSET_DEFAULT()} and
+ * {@link org.apache.samza.config.StreamConfig#CONSUMER_RESET_OFFSET()} configurations. It will use the locally
+ * checkpointed offset if it is valid, or fall back to oldest offset of the stream.
+ *
+ * @param ssp system stream partition for which starting offset is requested
+ * @param admin system admin associated with the ssp
+ * @param fileOffset local file offset for the ssp
+ * @param oldestOffset oldest offset for the ssp from the source
+ * @return starting offset for the incoming {@link SystemStreamPartition}
+ */
+ public static String getStartingOffset(
+ SystemStreamPartition ssp, SystemAdmin admin, String fileOffset, String oldestOffset) {
+ String startingOffset = oldestOffset;
+ if (fileOffset != null) {
+ // File offset was the last message written to the local checkpoint that is also reflected in the store,
+ // so we start with the NEXT offset
+ String resumeOffset = admin.getOffsetsAfter(ImmutableMap.of(ssp, fileOffset)).get(ssp);
+ if (admin.offsetComparator(oldestOffset, resumeOffset) <= 0) {
+ startingOffset = resumeOffset;
+ } else {
+ // If the offset we plan to use is older than the oldest offset, just use the oldest offset.
+ // This can happen with source of the store(changelog, etc) configured with a TTL cleanup policy
+ LOG.warn("Local store offset {} is lower than the oldest offset {} of the source stream."
+ + " The values between these offsets cannot be restored.", resumeOffset, oldestOffset);
+ }
+ }
+
+ return startingOffset;
+ }
+
+ /**
+ * Checks if the store is stale. If the time elapsed since the last modified time of the offset file is greater than
+ * the {@code storeDeleteRetentionInMs}, then the store is considered stale.
+ *
+ * @param storeDir the base directory of the store
+ * @param offsetFileName the offset file name
+ * @param storeDeleteRetentionInMs store delete retention in millis
+ * @param currentTimeMs current time in ms
+ * @return true if the store is stale, false otherwise
+ */
+ public static boolean isStaleStore(
+ File storeDir, String offsetFileName, long storeDeleteRetentionInMs, long currentTimeMs) {
+ boolean isStaleStore = false;
+ String storePath = storeDir.toPath().toString();
+ if (storeDir.exists()) {
+ File offsetFileRef = new File(storeDir, offsetFileName);
+ long offsetFileLastModifiedTime = offsetFileRef.lastModified();
+ if ((currentTimeMs - offsetFileLastModifiedTime) >= storeDeleteRetentionInMs) {
+ LOG.info(
+ String.format("Store: %s is stale since lastModifiedTime of offset file: %d, is older than store deleteRetentionMs: %d.",
+ storePath, offsetFileLastModifiedTime, storeDeleteRetentionInMs));
+ isStaleStore = true;
+ }
+ } else {
+ LOG.info("Storage partition directory: {} does not exist.", storePath);
+ }
+ return isStaleStore;
+ }
+
+ /**
+ * An offset file associated with logged store {@code storeDir} is valid if it exists and is not empty.
+ *
+ * @param storeDir the base directory of the store
+ * @param offsetFileName name of the offset file
+ * @return true if the offset file is valid. false otherwise.
+ */
+ public static boolean isOffsetFileValid(File storeDir, String offsetFileName) {
+ boolean hasValidOffsetFile = false;
+ if (storeDir.exists()) {
+ String offsetContents = readOffsetFile(storeDir, offsetFileName);
+ if (offsetContents != null && !offsetContents.isEmpty()) {
+ hasValidOffsetFile = true;
+ } else {
+ LOG.info("Offset file is not valid for store: {}.", storeDir.toPath());
+ }
+ }
+
+ return hasValidOffsetFile;
+ }
+
+ /**
+ * Read and return the contents of the offset file.
+ *
+ * @param storagePartitionDir the base directory of the store
+ * @param offsetFileName name of the offset file
+ * @return the content of the offset file if it exists for the store, null otherwise.
+ */
+ public static String readOffsetFile(File storagePartitionDir, String offsetFileName) {
+ String offset = null;
+ File offsetFileRef = new File(storagePartitionDir, offsetFileName);
+ String storePath = storagePartitionDir.getPath();
+
+ if (offsetFileRef.exists()) {
+ LOG.info("Found offset file in storage partition directory: {}", storePath);
+ offset = FileUtil.readWithChecksum(offsetFileRef);
+ } else {
+ LOG.info("No offset file found in storage partition directory: {}", storePath);
+ }
+
+ return offset;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
new file mode 100644
index 0000000..7a0a822
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.collect.ImmutableList;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaStorageConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.FileUtil;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * A storage manager for all side input stores. It is associated with each {@link org.apache.samza.container.TaskInstance}
+ * and is responsible for handling directory management, offset tracking and offset file management for the side input stores.
+ */
+public class TaskSideInputStorageManager {
+ private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputStorageManager.class);
+ private static final String OFFSET_FILE = "SIDE-INPUT-OFFSETS";
+ private static final long STORE_DELETE_RETENTION_MS = TimeUnit.DAYS.toMillis(1); // same as changelog delete retention
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private final Clock clock;
+ private final Map<String, SideInputsProcessor> storeToProcessor;
+ private final Map<String, StorageEngine> stores;
+ private final String storeBaseDir;
+ private final Map<String, Set<SystemStreamPartition>> storeToSSps;
+ private final Map<SystemStreamPartition, Set<String>> sspsToStores;
+ private final StreamMetadataCache streamMetadataCache;
+ private final SystemAdmins systemAdmins;
+ private final TaskName taskName;
+ private final JavaStorageConfig storageConfig;
+ private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+ private Map<SystemStreamPartition, String> startingOffsets;
+
+ public TaskSideInputStorageManager(
+ TaskName taskName,
+ StreamMetadataCache streamMetadataCache,
+ String storeBaseDir,
+ Map<String, StorageEngine> sideInputStores,
+ Map<String, SideInputsProcessor> storesToProcessor,
+ Map<String, Set<SystemStreamPartition>> storesToSSPs,
+ SystemAdmins systemAdmins,
+ Config config,
+ Clock clock) {
+ this.clock = clock;
+ this.storageConfig = new JavaStorageConfig(config);
+ this.stores = sideInputStores;
+ this.storeBaseDir = storeBaseDir;
+ this.storeToSSps = storesToSSPs;
+ this.streamMetadataCache = streamMetadataCache;
+ this.systemAdmins = systemAdmins;
+ this.taskName = taskName;
+ this.storeToProcessor = storesToProcessor;
+
+ validateStoreConfiguration();
+
+ this.sspsToStores = new HashMap<>();
+ storesToSSPs.forEach((store, ssps) -> {
+ for (SystemStreamPartition ssp: ssps) {
+ sspsToStores.computeIfAbsent(ssp, key -> new HashSet<>());
+ sspsToStores.computeIfPresent(ssp, (key, value) -> {
+ value.add(store);
+ return value;
+ });
+ }
+ });
+ }
+
+ /**
+ * Initializes the side input storage manager.
+ */
+ public void init() {
+ LOG.info("Initializing side input stores.");
+
+ Map<SystemStreamPartition, String> fileOffsets = getFileOffsets();
+ LOG.info("File offsets for the task {}: ", taskName, fileOffsets);
+
+ Map<SystemStreamPartition, String> oldestOffsets = getOldestOffsets();
+ LOG.info("Oldest offsets for the task {}: ", taskName, fileOffsets);
+
+ startingOffsets = getStartingOffsets(fileOffsets, oldestOffsets);
+ LOG.info("Starting offsets for the task {}: {}", taskName, startingOffsets);
+
+ lastProcessedOffsets.putAll(fileOffsets);
+ LOG.info("Last processed offsets for the task {}: {}", taskName, lastProcessedOffsets);
+
+ initializeStoreDirectories();
+ }
+
+ /**
+ * Flushes the contents of the underlying store and writes the offset file to disk.
+ */
+ public void flush() {
+ LOG.info("Flushing the side input stores.");
+ stores.values().forEach(StorageEngine::flush);
+ writeOffsetFiles();
+ }
+
+ /**
+ * Stops the storage engines for all the stores and writes the offset file to disk.
+ */
+ public void stop() {
+ LOG.info("Stopping the side input stores.");
+ stores.values().forEach(StorageEngine::stop);
+ writeOffsetFiles();
+ }
+
+ /**
+ * Gets the {@link StorageEngine} associated with the input {@code storeName} if found, or null.
+ *
+ * @param storeName store name to get the {@link StorageEngine} for
+ * @return the {@link StorageEngine} associated with {@code storeName} if found, or null
+ */
+ public StorageEngine getStore(String storeName) {
+ return stores.get(storeName);
+ }
+
+ /**
+ * Gets the starting offset for the given side input {@link SystemStreamPartition}.
+ *
+ * Note: The method doesn't respect {@link org.apache.samza.config.StreamConfig#CONSUMER_OFFSET_DEFAULT()} and
+ * {@link org.apache.samza.config.StreamConfig#CONSUMER_RESET_OFFSET()} configurations. It will use the local offset
+ * file if it is valid, else it will fall back to oldest offset in the stream.
+ *
+ * @param ssp side input system stream partition to get the starting offset for
+ * @return the starting offset
+ */
+ public String getStartingOffset(SystemStreamPartition ssp) {
+ return startingOffsets.get(ssp);
+ }
+
+ /**
+ * Gets the last processed offset for the given side input {@link SystemStreamPartition}.
+ *
+ * @param ssp side input system stream partition to get the last processed offset for
+ * @return the last processed offset
+ */
+ public String getLastProcessedOffset(SystemStreamPartition ssp) {
+ return lastProcessedOffsets.get(ssp);
+ }
+
+ /**
+ * Processes the incoming side input message envelope and updates the last processed offset for its SSP.
+ *
+ * @param message incoming message to be processed
+ */
+ public void process(IncomingMessageEnvelope message) {
+ SystemStreamPartition ssp = message.getSystemStreamPartition();
+ Set<String> storeNames = sspsToStores.get(ssp);
+
+ for (String storeName : storeNames) {
+ SideInputsProcessor sideInputsProcessor = storeToProcessor.get(storeName);
+
+ KeyValueStore keyValueStore = (KeyValueStore) stores.get(storeName);
+ Collection<Entry<?, ?>> entriesToBeWritten = sideInputsProcessor.process(message, keyValueStore);
+ keyValueStore.putAll(ImmutableList.copyOf(entriesToBeWritten));
+ }
+
+ // update the last processed offset
+ lastProcessedOffsets.put(ssp, message.getOffset());
+ }
+
+ /**
+ * Initializes the store directories for all the stores:
+ * 1. Cleans up the directories for invalid stores.
+ * 2. Ensures that the directories exist.
+ */
+ private void initializeStoreDirectories() {
+ LOG.info("Initializing side input store directories.");
+
+ stores.keySet().forEach(storeName -> {
+ File storeLocation = getStoreLocation(storeName);
+ String storePath = storeLocation.toPath().toString();
+ if (!isValidSideInputStore(storeName, storeLocation)) {
+ LOG.info("Cleaning up the store directory at {} for {}", storePath, storeName);
+ FileUtil.rm(storeLocation);
+ }
+
+ if (!storeLocation.exists()) {
+ LOG.info("Creating {} as the store directory for the side input store {}", storePath, storeName);
+ storeLocation.mkdirs();
+ }
+ });
+ }
+
+ /**
+ * Writes the offset files for all side input stores one by one. There is one offset file per store.
+ * Its contents are a JSON encoded mapping from each side input SSP to its last processed offset, and a checksum.
+ */
+ private void writeOffsetFiles() {
+ storeToSSps.entrySet().stream()
+ .filter(entry -> isPersistedStore(entry.getKey())) // filter out in-memory side input stores
+ .forEach((entry) -> {
+ String storeName = entry.getKey();
+ Map<SystemStreamPartition, String> offsets = entry.getValue().stream()
+ .filter(lastProcessedOffsets::containsKey)
+ .collect(Collectors.toMap(Function.identity(), lastProcessedOffsets::get));
+
+ try {
+ String fileContents = OBJECT_MAPPER.writeValueAsString(offsets);
+ File offsetFile = new File(getStoreLocation(storeName), OFFSET_FILE);
+ FileUtil.writeWithChecksum(offsetFile, fileContents);
+ } catch (Exception e) {
+ throw new SamzaException("Failed to write offset file for side input store: " + storeName, e);
+ }
+ });
+ }
+
+ /**
+ * Gets the side input SSP offsets for all stores from their local offset files.
+ *
+ * @return a {@link Map} of {@link SystemStreamPartition} to offset in the offset files.
+ */
+ @SuppressWarnings("unchecked")
+ private Map<SystemStreamPartition, String> getFileOffsets() {
+ LOG.info("Loading initial offsets from the file for side input stores.");
+ Map<SystemStreamPartition, String> fileOffsets = new HashMap<>();
+
+ stores.keySet().forEach(storeName -> {
+ LOG.debug("Reading local offsets for store: {}", storeName);
+
+ File storeLocation = getStoreLocation(storeName);
+ if (isValidSideInputStore(storeName, storeLocation)) {
+ try {
+ String fileContents = StorageManagerUtil.readOffsetFile(storeLocation, OFFSET_FILE);
+ Map<SystemStreamPartition, String> offsets = OBJECT_MAPPER.readValue(fileContents, Map.class);
+ fileOffsets.putAll(offsets);
+ } catch (Exception e) {
+ LOG.warn("Failed to load the offset file for side input store:" + storeName, e);
+ }
+ }
+ });
+
+ return fileOffsets;
+ }
+
+ private File getStoreLocation(String storeName) {
+ return new File(storeBaseDir, (storeName + File.separator + taskName.toString()).replace(' ', '_'));
+ }
+
+ /**
+ * Gets the starting offsets for the {@link SystemStreamPartition}s belonging to all the side input stores.
+ * If the local file offset is available and is greater than the oldest available offset from source, uses it,
+ * else falls back to oldest offset in the source.
+ *
+ * @param fileOffsets offsets from the local offset file
+ * @param oldestOffsets oldest offsets from the source
+ * @return a {@link Map} of {@link SystemStreamPartition} to offset
+ */
+ private Map<SystemStreamPartition, String> getStartingOffsets(
+ Map<SystemStreamPartition, String> fileOffsets, Map<SystemStreamPartition, String> oldestOffsets) {
+ Map<SystemStreamPartition, String> startingOffsets = new HashMap<>();
+
+ sspsToStores.keySet().forEach(ssp -> {
+ String fileOffset = fileOffsets.get(ssp);
+ String oldestOffset = oldestOffsets.get(ssp);
+
+ startingOffsets.put(ssp,
+ StorageManagerUtil.getStartingOffset(
+ ssp, systemAdmins.getSystemAdmin(ssp.getSystem()), fileOffset, oldestOffset));
+ });
+
+ return startingOffsets;
+ }
+
+ /**
+ * Gets the oldest offset for the {@link SystemStreamPartition}s associated with all the store side inputs.
+ * 1. Groups the list of the SSPs based on system stream
+ * 2. Fetches the {@link SystemStreamMetadata} from {@link StreamMetadataCache}
+ * 3. Fetches the partition metadata for each system stream and fetch the corresponding partition metadata
+ * and populates the oldest offset for SSPs belonging to the system stream.
+ *
+ * @return a {@link Map} of {@link SystemStreamPartition} to their oldest offset.
+ */
+ private Map<SystemStreamPartition, String> getOldestOffsets() {
+ Map<SystemStreamPartition, String> oldestOffsets = new HashMap<>();
+
+ // Step 1
+ Map<SystemStream, List<SystemStreamPartition>> systemStreamToSsp = sspsToStores.keySet().stream()
+ .collect(Collectors.groupingBy(SystemStreamPartition::getSystemStream));
+
+ // Step 2
+ Map<SystemStream, SystemStreamMetadata> metadata = JavaConverters.mapAsJavaMapConverter(
+ streamMetadataCache.getStreamMetadata(
+ JavaConverters.asScalaSetConverter(systemStreamToSsp.keySet()).asScala().toSet(), false)).asJava();
+
+ // Step 3
+ metadata.forEach((systemStream, systemStreamMetadata) -> {
+ // get the partition metadata for each system stream
+ Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata =
+ systemStreamMetadata.getSystemStreamPartitionMetadata();
+
+ // For SSPs belonging to the system stream, use the partition metadata to get the oldest offset
+ Map<SystemStreamPartition, String> offsets = systemStreamToSsp.get(systemStream).stream()
+ .collect(
+ Collectors.toMap(Function.identity(), ssp -> partitionMetadata.get(ssp.getPartition()).getOldestOffset()));
+
+ oldestOffsets.putAll(offsets);
+ });
+
+ return oldestOffsets;
+ }
+
+ private boolean isValidSideInputStore(String storeName, File storeLocation) {
+ return isPersistedStore(storeName)
+ && !StorageManagerUtil.isStaleStore(storeLocation, OFFSET_FILE, STORE_DELETE_RETENTION_MS, clock.currentTimeMillis())
+ && StorageManagerUtil.isOffsetFileValid(storeLocation, OFFSET_FILE);
+ }
+
+ private boolean isPersistedStore(String storeName) {
+ return Optional.ofNullable(stores.get(storeName))
+ .map(StorageEngine::getStoreProperties)
+ .map(StoreProperties::isPersistedToDisk)
+ .orElse(false);
+ }
+
+ private void validateStoreConfiguration() {
+ stores.forEach((storeName, storageEngine) -> {
+ if (StringUtils.isBlank(storageConfig.getSideInputsProcessorFactory(storeName))) {
+ throw new SamzaException(
+ String.format("Side inputs processor factory configuration missing for store: %s.", storeName));
+ }
+
+ if (storageEngine.getStoreProperties().isLoggedStore()) {
+ throw new SamzaException(
+ String.format("Cannot configure both side inputs and a changelog for store: %s.", storeName));
+ }
+ });
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
index e4ee767..c9df3b5 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
@@ -76,6 +76,18 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging
conf.asScala.keys.filter(k => k.endsWith(".factory")).map(k => k.substring(0, k.length - ".factory".length)).toSeq
}
+ def getSideInputs(storeName: String): Seq[String] = {
+ new JavaStorageConfig(config).getSideInputs(storeName).asScala
+ }
+
+ def getSideInputsProcessorFactory(storeName: String): Option[String] = {
+ Option(new JavaStorageConfig(config).getSideInputsProcessorFactory(storeName))
+ }
+
+ def getSideInputsProcessorSerializedInstance(storeName: String): Option[String] = {
+ Option(new JavaStorageConfig(config).getSideInputsProcessorSerializedInstance(storeName))
+ }
+
/**
* Build a map of storeName to changeLogDeleteRetention for all of the stores.
* @return a map from storeName to the changeLogDeleteRetention of the store in ms.
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index 0a4623e..298c8ca 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -42,8 +42,8 @@ object StreamConfig {
// We don't want any external dependencies on these patterns while both exist. Use getProperty to ensure proper values.
private val STREAMS_PREFIX = "streams."
- private val STREAM_PREFIX = "systems.%s.streams.%s."
+ val STREAM_PREFIX = "systems.%s.streams.%s."
val STREAM_ID_PREFIX = STREAMS_PREFIX + "%s."
val SYSTEM_FOR_STREAM_ID = STREAM_ID_PREFIX + SYSTEM
val PHYSICAL_NAME_FOR_STREAM_ID = STREAM_ID_PREFIX + PHYSICAL_NAME
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 89278ad..35802ac 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -30,7 +30,6 @@ import java.util.concurrent.{ExecutorService, Executors, ScheduledExecutorServic
import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ThreadFactoryBuilder
-import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.MetricsConfig.Config2Metrics
@@ -48,10 +47,11 @@ import org.apache.samza.job.model.{ContainerModel, JobModel}
import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, MetricsReporter}
import org.apache.samza.serializers._
import org.apache.samza.serializers.model.SamzaObjectMapper
-import org.apache.samza.storage.{StorageEngineFactory, TaskStorageManager}
+import org.apache.samza.storage._
import org.apache.samza.system._
import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory, RoundRobinChooserFactory}
import org.apache.samza.table.TableManager
+import org.apache.samza.table.utils.SerdeUtils
import org.apache.samza.task._
import org.apache.samza.util.Util
import org.apache.samza.util._
@@ -354,6 +354,14 @@ object SamzaContainer extends Logging {
info("Got intermediate streams: %s" format intermediateStreams)
+ val sideInputStoresToSystemStreams = config.getStoreNames
+ .map { storeName => (storeName, config.getSideInputs(storeName)) }
+ .filter { case (storeName, sideInputs) => sideInputs.nonEmpty }
+ .map { case (storeName, sideInputs) => (storeName, sideInputs.map(Util.getSystemStreamFromNameOrId(config, _))) }
+ .toMap
+
+ info("Got side input store system streams: %s" format sideInputStoresToSystemStreams)
+
val controlMessageKeySerdes = intermediateStreams
.flatMap(streamId => {
val systemStream = config.streamIdToSystemStream(streamId)
@@ -531,7 +539,7 @@ object SamzaContainer extends Logging {
val nonLoggedStorageBaseDir = getNonLoggedStorageBaseDir(config, defaultStoreBaseDir)
info("Got base directory for non logged data stores: %s" format nonLoggedStorageBaseDir)
- var loggedStorageBaseDir = getLoggedStorageBaseDir(config, defaultStoreBaseDir)
+ val loggedStorageBaseDir = getLoggedStorageBaseDir(config, defaultStoreBaseDir)
info("Got base directory for logged data stores: %s" format loggedStorageBaseDir)
val taskStores = storageEngineFactories
@@ -577,9 +585,32 @@ object SamzaContainer extends Logging {
info("Got task stores: %s" format taskStores)
+ val taskSSPs = taskModel.getSystemStreamPartitions.asScala.toSet
+ info("Got task SSPs: %s" format taskSSPs)
+
+ val (sideInputStores, nonSideInputStores) =
+ taskStores.partition { case (storeName, _) => sideInputStoresToSystemStreams.contains(storeName)}
+
+ val sideInputStoresToSSPs = sideInputStoresToSystemStreams.mapValues(sideInputSystemStreams =>
+ taskSSPs.filter(ssp => sideInputSystemStreams.contains(ssp.getSystemStream)).asJava)
+
+ val taskSideInputSSPs = sideInputStoresToSSPs.values.flatMap(_.asScala).toSet
+
+ info ("Got task side input SSPs: %s" format taskSideInputSSPs)
+
+ val sideInputStoresToProcessor = sideInputStores.keys.map(storeName => {
+ // serialized instances takes precedence over the factory configuration.
+ config.getSideInputsProcessorSerializedInstance(storeName).map(serializedInstance =>
+ (storeName, SerdeUtils.deserialize("Side Inputs Processor", serializedInstance)))
+ .orElse(config.getSideInputsProcessorFactory(storeName).map(factoryClassName =>
+ (storeName, Util.getObj(factoryClassName, classOf[SideInputsProcessorFactory])
+ .getSideInputsProcessor(config, taskInstanceMetrics.registry))))
+ .get
+ }).toMap
+
val storageManager = new TaskStorageManager(
taskName = taskName,
- taskStores = taskStores,
+ taskStores = nonSideInputStores,
storeConsumers = storeConsumers,
changeLogSystemStreams = changeLogSystemStreams,
maxChangeLogStreamPartitions,
@@ -592,17 +623,24 @@ object SamzaContainer extends Logging {
new StorageConfig(config).getChangeLogDeleteRetentionsInMs,
new SystemClock)
+ var sideInputStorageManager: TaskSideInputStorageManager = null
+ if (sideInputStores.nonEmpty) {
+ sideInputStorageManager = new TaskSideInputStorageManager(
+ taskName,
+ streamMetadataCache,
+ loggedStorageBaseDir.getPath,
+ sideInputStores.asJava,
+ sideInputStoresToProcessor.asJava,
+ sideInputStoresToSSPs.asJava,
+ systemAdmins,
+ config,
+ new SystemClock)
+ }
+
val tableManager = new TableManager(config, serdes.asJava)
info("Got table manager")
- val systemStreamPartitions = taskModel
- .getSystemStreamPartitions
- .asScala
- .toSet
-
- info("Retrieved SystemStreamPartitions " + systemStreamPartitions + " for " + taskName)
-
def createTaskInstance(task: Any): TaskInstance = new TaskInstance(
task = task,
taskName = taskName,
@@ -616,11 +654,13 @@ object SamzaContainer extends Logging {
storageManager = storageManager,
tableManager = tableManager,
reporters = reporters,
- systemStreamPartitions = systemStreamPartitions,
+ systemStreamPartitions = taskSSPs,
exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics, config),
jobModel = jobModel,
streamMetadataCache = streamMetadataCache,
- timerExecutor = timerExecutor)
+ timerExecutor = timerExecutor,
+ sideInputSSPs = taskSideInputSSPs,
+ sideInputStorageManager = sideInputStorageManager)
val taskInstance = createTaskInstance(task)
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 64ee7f3..0caca4f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -28,11 +28,12 @@ import org.apache.samza.config.Config
import org.apache.samza.config.StreamConfig.Config2Stream
import org.apache.samza.job.model.JobModel
import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.storage.TaskStorageManager
+import org.apache.samza.storage.kv.KeyValueStore
+import org.apache.samza.storage.{TaskSideInputStorageManager, TaskStorageManager}
import org.apache.samza.system._
import org.apache.samza.table.TableManager
import org.apache.samza.task._
-import org.apache.samza.util.Logging
+import org.apache.samza.util.{Logging, ScalaJavaUtil}
import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
@@ -55,15 +56,29 @@ class TaskInstance(
val exceptionHandler: TaskInstanceExceptionHandler = new TaskInstanceExceptionHandler,
jobModel: JobModel = null,
streamMetadataCache: StreamMetadataCache = null,
- timerExecutor : ScheduledExecutorService = null) extends Logging {
+ timerExecutor : ScheduledExecutorService = null,
+ sideInputSSPs: Set[SystemStreamPartition] = Set(),
+ sideInputStorageManager: TaskSideInputStorageManager = null) extends Logging {
+
val isInitableTask = task.isInstanceOf[InitableTask]
val isWindowableTask = task.isInstanceOf[WindowableTask]
val isEndOfStreamListenerTask = task.isInstanceOf[EndOfStreamListenerTask]
val isClosableTask = task.isInstanceOf[ClosableTask]
val isAsyncTask = task.isInstanceOf[AsyncStreamTask]
+ val kvStoreSupplier = ScalaJavaUtil.toJavaFunction(
+ (storeName: String) => {
+ if (storageManager != null && storageManager.getStore(storeName).isDefined) {
+ storageManager.getStore(storeName).get.asInstanceOf[KeyValueStore[_, _]]
+ } else if (sideInputStorageManager != null && sideInputStorageManager.getStore(storeName) != null) {
+ sideInputStorageManager.getStore(storeName).asInstanceOf[KeyValueStore[_, _]]
+ } else {
+ null
+ }
+ })
+
val context = new TaskContextImpl(taskName, metrics, containerContext, systemStreamPartitions.asJava, offsetManager,
- storageManager, tableManager, jobModel, streamMetadataCache, timerExecutor)
+ kvStoreSupplier, tableManager, jobModel, streamMetadataCache, timerExecutor)
// store the (ssp -> if this ssp is catched up) mapping. "catched up"
// means the same ssp in other taskInstances have the same offset as
@@ -85,7 +100,8 @@ class TaskInstance(
def registerOffsets {
debug("Registering offsets for taskName: %s" format taskName)
- offsetManager.register(taskName, systemStreamPartitions)
+ val sspsToRegister = systemStreamPartitions -- sideInputSSPs
+ offsetManager.register(taskName, sspsToRegister)
}
def startStores {
@@ -96,6 +112,13 @@ class TaskInstance(
} else {
debug("Skipping storage manager initialization for taskName: %s" format taskName)
}
+
+ if (sideInputStorageManager != null) {
+ debug("Starting side input storage manager for taskName: %s" format taskName)
+ sideInputStorageManager.init()
+ } else {
+ debug("Skipping side input storage manager initialization for taskName: %s" format taskName)
+ }
}
def startTableManager {
@@ -128,14 +151,14 @@ class TaskInstance(
debug("Registering consumers for taskName: %s" format taskName)
systemStreamPartitions.foreach(systemStreamPartition => {
- val offset = offsetManager.getStartingOffset(taskName, systemStreamPartition)
- .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format systemStreamPartition))
- consumerMultiplexer.register(systemStreamPartition, offset)
- metrics.addOffsetGauge(systemStreamPartition, () => {
- offsetManager
- .getLastProcessedOffset(taskName, systemStreamPartition)
- .orNull
- })
+ val startingOffset = getStartingOffset(systemStreamPartition)
+ consumerMultiplexer.register(systemStreamPartition, startingOffset)
+ metrics.addOffsetGauge(systemStreamPartition, () =>
+ if (sideInputSSPs.contains(systemStreamPartition)) {
+ sideInputStorageManager.getLastProcessedOffset(systemStreamPartition)
+ } else {
+ offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).orNull
+ })
})
}
@@ -143,31 +166,37 @@ class TaskInstance(
callbackFactory: TaskCallbackFactory = null) {
metrics.processes.inc
- if (!ssp2CaughtupMapping.getOrElse(envelope.getSystemStreamPartition,
- throw new SamzaException(envelope.getSystemStreamPartition + " is not registered!"))) {
+ val incomingMessageSsp = envelope.getSystemStreamPartition
+
+ if (!ssp2CaughtupMapping.getOrElse(incomingMessageSsp,
+ throw new SamzaException(incomingMessageSsp + " is not registered!"))) {
checkCaughtUp(envelope)
}
- if (ssp2CaughtupMapping(envelope.getSystemStreamPartition)) {
+ if (ssp2CaughtupMapping(incomingMessageSsp)) {
metrics.messagesActuallyProcessed.inc
trace("Processing incoming message envelope for taskName and SSP: %s, %s"
- format (taskName, envelope.getSystemStreamPartition))
+ format (taskName, incomingMessageSsp))
- if (isAsyncTask) {
- exceptionHandler.maybeHandle {
- val callback = callbackFactory.createCallback()
- task.asInstanceOf[AsyncStreamTask].processAsync(envelope, collector, coordinator, callback)
- }
+ if (sideInputSSPs.contains(incomingMessageSsp)) {
+ sideInputStorageManager.process(envelope)
} else {
- exceptionHandler.maybeHandle {
- task.asInstanceOf[StreamTask].process(envelope, collector, coordinator)
- }
+ if (isAsyncTask) {
+ exceptionHandler.maybeHandle {
+ val callback = callbackFactory.createCallback()
+ task.asInstanceOf[AsyncStreamTask].processAsync(envelope, collector, coordinator, callback)
+ }
+ } else {
+ exceptionHandler.maybeHandle {
+ task.asInstanceOf[StreamTask].process(envelope, collector, coordinator)
+ }
- trace("Updating offset map for taskName, SSP and offset: %s, %s, %s"
- format (taskName, envelope.getSystemStreamPartition, envelope.getOffset))
+ trace("Updating offset map for taskName, SSP and offset: %s, %s, %s"
+ format (taskName, incomingMessageSsp, envelope.getOffset))
- offsetManager.update(taskName, envelope.getSystemStreamPartition, envelope.getOffset)
+ offsetManager.update(taskName, incomingMessageSsp, envelope.getOffset)
+ }
}
}
}
@@ -217,8 +246,12 @@ class TaskInstance(
storageManager.flush
}
- trace("Checkpointing offsets for taskName: %s" format taskName)
+ trace("Flushing side input stores for taskName: %s" format taskName)
+ if (sideInputStorageManager != null) {
+ sideInputStorageManager.flush()
+ }
+ trace("Checkpointing offsets for taskName: %s" format taskName)
offsetManager.writeCheckpoint(taskName, checkpoint)
if (checkpoint != null) {
@@ -249,6 +282,13 @@ class TaskInstance(
} else {
debug("Skipping storage manager shutdown for taskName: %s" format taskName)
}
+
+ if (sideInputStorageManager != null) {
+ debug("Shutting down side input storage manager for taskName: %s" format taskName)
+ sideInputStorageManager.stop()
+ } else {
+ debug("Skipping side input storage manager shutdown for taskName: %s" format taskName)
+ }
}
def shutdownTableManager {
@@ -272,27 +312,29 @@ class TaskInstance(
* it's already catched-up.
*/
private def checkCaughtUp(envelope: IncomingMessageEnvelope) = {
+ val incomingMessageSsp = envelope.getSystemStreamPartition
+
if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(envelope.getOffset)) {
- ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true
+ ssp2CaughtupMapping(incomingMessageSsp) = true
} else {
systemAdmins match {
case null => {
warn("systemAdmin is null. Set all SystemStreamPartitions to catched-up")
- ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true
+ ssp2CaughtupMapping(incomingMessageSsp) = true
}
case others => {
- val startingOffset = offsetManager.getStartingOffset(taskName, envelope.getSystemStreamPartition)
- .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format envelope.getSystemStreamPartition))
- val system = envelope.getSystemStreamPartition.getSystem
+ val startingOffset = getStartingOffset(incomingMessageSsp)
+
+ val system = incomingMessageSsp.getSystem
others.getSystemAdmin(system).offsetComparator(envelope.getOffset, startingOffset) match {
case null => {
info("offsets in " + system + " is not comparable. Set all SystemStreamPartitions to catched-up")
- ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true // not comparable
+ ssp2CaughtupMapping(incomingMessageSsp) = true // not comparable
}
case result => {
if (result >= 0) {
- info(envelope.getSystemStreamPartition.toString + " is catched up.")
- ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true
+ info(incomingMessageSsp.toString + " is catched up.")
+ ssp2CaughtupMapping(incomingMessageSsp) = true
}
}
}
@@ -300,4 +342,18 @@ class TaskInstance(
}
}
}
+
+ private def getStartingOffset(systemStreamPartition: SystemStreamPartition) = {
+ val offset =
+ if (sideInputSSPs.contains(systemStreamPartition)) {
+ Option(sideInputStorageManager.getStartingOffset(systemStreamPartition))
+ } else {
+ offsetManager.getStartingOffset(taskName, systemStreamPartition)
+ }
+
+ val startingOffset = offset.getOrElse(
+ throw new SamzaException("No offset defined for SystemStreamPartition: %s" format systemStreamPartition))
+
+ startingOffset
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index 62b59fb..90fdc19 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -28,8 +28,6 @@ import org.apache.samza.container.TaskName
import org.apache.samza.system._
import org.apache.samza.util.{Clock, FileUtil, Logging}
-import scala.collection.JavaConverters._
-
object TaskStorageManager {
def getStoreDir(storeBaseDir: File, storeName: String) = {
new File(storeBaseDir, storeName)
@@ -70,7 +68,7 @@ class TaskStorageManager(
val fileOffsets: util.Map[SystemStreamPartition, String] = new util.HashMap[SystemStreamPartition, String]()
val offsetFileName = "OFFSET"
- def apply(storageEngineName: String) = taskStores(storageEngineName)
+ def getStore(storeName: String): Option[StorageEngine] = taskStores.get(storeName)
def init {
cleanBaseDirs()
@@ -101,7 +99,7 @@ class TaskStorageManager(
info("Deleting logged storage partition directory %s." format loggedStorePartitionDir.toPath.toString)
FileUtil.rm(loggedStorePartitionDir)
} else {
- val offset = readOffsetFile(loggedStorePartitionDir)
+ val offset = StorageManagerUtil.readOffsetFile(loggedStorePartitionDir, offsetFileName)
info("Read offset %s for the store %s from logged storage partition directory %s." format(offset, storeName, loggedStorePartitionDir))
if (offset != null) {
fileOffsets.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset)
@@ -111,7 +109,7 @@ class TaskStorageManager(
}
/**
- * Directory {@code loggedStoreDir} associated with the logged store {@code storeName} is valid,
+ * Directory loggedStoreDir associated with the logged store storeName is valid
* if all of the following conditions are true.
* a) If the store has to be persisted to disk.
* b) If there is a valid offset file associated with the logged store.
@@ -120,55 +118,12 @@ class TaskStorageManager(
* @return true if the logged store is valid, false otherwise.
*/
private def isLoggedStoreValid(storeName: String, loggedStoreDir: File): Boolean = {
- val changeLogDeleteRetentionInMs = changeLogDeleteRetentionsInMs.getOrElse(storeName,
- StorageConfig.DEFAULT_CHANGELOG_DELETE_RETENTION_MS)
- persistedStores.contains(storeName) && isOffsetFileValid(loggedStoreDir) &&
- !isStaleLoggedStore(loggedStoreDir, changeLogDeleteRetentionInMs)
- }
+ val changeLogDeleteRetentionInMs = changeLogDeleteRetentionsInMs
+ .getOrElse(storeName, StorageConfig.DEFAULT_CHANGELOG_DELETE_RETENTION_MS)
- /**
- * Determines if the logged store directory {@code loggedStoreDir} is stale. A store is stale if the following condition is true.
- *
- * ((CurrentTime) - (LastModifiedTime of the Offset file) is greater than the changelog's tombstone retention).
- *
- * @param loggedStoreDir the base directory of the local change-logged store.
- * @param changeLogDeleteRetentionInMs the delete retention of the changelog in milli seconds.
- * @return true if the store is stale, false otherwise.
- *
- */
- private def isStaleLoggedStore(loggedStoreDir: File, changeLogDeleteRetentionInMs: Long): Boolean = {
- var isStaleStore = false
- val storePath = loggedStoreDir.toPath.toString
- if (loggedStoreDir.exists()) {
- val offsetFileRef = new File(loggedStoreDir, offsetFileName)
- val offsetFileLastModifiedTime = offsetFileRef.lastModified()
- if ((clock.currentTimeMillis() - offsetFileLastModifiedTime) >= changeLogDeleteRetentionInMs) {
- info ("Store: %s is stale since lastModifiedTime of offset file: %s, " +
- "is older than changelog deleteRetentionMs: %s." format(storePath, offsetFileLastModifiedTime, changeLogDeleteRetentionInMs))
- isStaleStore = true
- }
- } else {
- info("Logged storage partition directory: %s does not exist." format storePath)
- }
- isStaleStore
- }
-
- /**
- * An offset file associated with logged store {@code loggedStoreDir} is valid if it exists and is not empty.
- *
- * @return true if the offset file is valid. false otherwise.
- */
- private def isOffsetFileValid(loggedStoreDir: File): Boolean = {
- var hasValidOffsetFile = false
- if (loggedStoreDir.exists()) {
- val offsetContents = readOffsetFile(loggedStoreDir)
- if (offsetContents != null && !offsetContents.isEmpty) {
- hasValidOffsetFile = true
- } else {
- info("Offset file is not valid for store: %s." format loggedStoreDir.toPath.toString)
- }
- }
- hasValidOffsetFile
+ persistedStores.contains(storeName) &&
+ StorageManagerUtil.isOffsetFileValid(loggedStoreDir, offsetFileName) &&
+ !StorageManagerUtil.isStaleStore(loggedStoreDir, offsetFileName, changeLogDeleteRetentionInMs, clock.currentTimeMillis())
}
private def setupBaseDirs() {
@@ -187,24 +142,6 @@ class TaskStorageManager(
}
}
- /**
- * Read and return the contents of the offset file.
- *
- * @param loggedStoragePartitionDir the base directory of the store
- * @return the content of the offset file if it exists for the store, null otherwise.
- */
- private def readOffsetFile(loggedStoragePartitionDir: File): String = {
- var offset : String = null
- val offsetFileRef = new File(loggedStoragePartitionDir, offsetFileName)
- if (offsetFileRef.exists()) {
- info("Found offset file in logged storage partition directory: %s" format loggedStoragePartitionDir.toPath.toString)
- offset = FileUtil.readWithChecksum(offsetFileRef)
- } else {
- info("No offset file found in logged storage partition directory: %s" format loggedStoragePartitionDir.toPath.toString)
- }
- offset
- }
-
private def validateChangelogStreams() = {
info("Validating change log streams: " + changeLogSystemStreams)
@@ -262,22 +199,7 @@ class TaskStorageManager(
.getOrElse(systemStreamPartition.getSystemStream,
throw new SamzaException("Missing a change log offset for %s." format systemStreamPartition))
- if (fileOffset != null) {
- // File offset was the last message written to the changelog that is also reflected in the store,
- // so we start with the NEXT offset
- val resumeOffset = admin.getOffsetsAfter(Map(systemStreamPartition -> fileOffset).asJava).get(systemStreamPartition)
- if (admin.offsetComparator(oldestOffset, resumeOffset) <= 0) {
- resumeOffset
- } else {
- // If the offset we plan to use is older than the oldest offset, just use the oldest offset.
- // This can happen with changelogs configured with a TTL cleanup policy
- warn(s"Local store offset $resumeOffset is lower than the oldest offset $oldestOffset of the changelog. " +
- s"The values between these offsets cannot be restored.")
- oldestOffset
- }
- } else {
- oldestOffset
- }
+ StorageManagerUtil.getStartingOffset(systemStreamPartition, admin, fileOffset, oldestOffset)
}
private def restoreStores() {
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
index 4b93543..46a2089 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
@@ -33,7 +33,7 @@ object FileUtil {
* @param file The file handle to write to
* @param data The data to be written to the file
* */
- def writeWithChecksum(file: File, data: String) = {
+ def writeWithChecksum(file: File, data: String): Unit = {
val checksum = getChecksum(data)
var oos: ObjectOutputStream = null
var fos: FileOutputStream = null
@@ -52,7 +52,7 @@ object FileUtil {
* Reads from a file that has a checksum prepended to the data
* @param file The file handle to read from
* */
- def readWithChecksum(file: File) = {
+ def readWithChecksum(file: File): String = {
var fis: FileInputStream = null
var ois: ObjectInputStream = null
try {
@@ -76,7 +76,7 @@ object FileUtil {
* Recursively remove a directory (or file), and all sub-directories. Equivalent
* to rm -rf.
*/
- def rm(file: File) {
+ def rm(file: File): Unit = {
if (file == null) {
return
} else if (file.isDirectory) {
@@ -96,7 +96,7 @@ object FileUtil {
* @param data The string for which checksum has to be generated
* @return long type value representing the checksum
* */
- def getChecksum(data: String) = {
+ def getChecksum(data: String): Long = {
val crc = new CRC32
crc.update(data.getBytes)
crc.getValue
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala
index f3ba746..a359cd5 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala
@@ -21,6 +21,8 @@
package org.apache.samza.util
+import java.util.function
+
import scala.collection.immutable.Map
import scala.collection.JavaConverters._
import scala.runtime.AbstractFunction0
@@ -47,6 +49,12 @@ object ScalaJavaUtil {
}
}
+ def toJavaFunction[T, R](scalaFunction: Function1[T, R]): java.util.function.Function[T, R] = {
+ new function.Function[T, R] {
+ override def apply(t: T): R = scalaFunction.apply(t)
+ }
+ }
+
/**
* Wraps the provided Java Supplier in an Scala Function, e.g. for use in [[Option#getOrDefault]]
*
@@ -59,4 +67,8 @@ object ScalaJavaUtil {
override def apply(): T = javaFunction.get()
}
}
+
+ def toScalaFunction[T, R](javaFunction: java.util.function.Function[T, R]): Function1[T, R] = {
+ t => javaFunction.apply(t)
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 059eb03..c9534bc 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -87,6 +87,26 @@ object Util extends Logging {
}
/**
+ * Gets the [[SystemStream]] corresponding to the provided stream, which may be
+ * a streamId, or stream name of the format systemName.streamName.
+ *
+ * @param stream the stream name or id to get the { @link SystemStream} for.
+ * @return the [[SystemStream]] for the stream
+ */
+ def getSystemStreamFromNameOrId(config: Config, stream: String): SystemStream = {
+ val parts = stream.split(".")
+ if (parts.length == 0 || parts.length > 2) {
+ throw new SamzaException(
+ String.format("Invalid stream %s. Expected to be of the format streamId or systemName.streamName", stream))
+ }
+ if (parts.length == 1) {
+ new StreamConfig(config).streamIdToSystemStream(stream)
+ } else {
+ new SystemStream(parts(0), parts(1))
+ }
+ }
+
+ /**
* Returns the the first host address which is not the loopback address, or [[java.net.InetAddress#getLocalHost]] as a fallback
*
* @return the [[java.net.InetAddress]] which represents the localhost
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 88767f5..9cdbfe6 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -82,7 +82,7 @@ public class TestAsyncRunLoop {
return new TaskInstance(task, taskName, mock(Config.class), taskInstanceMetrics,
null, consumers, mock(TaskInstanceCollector.class), mock(SamzaContainerContext.class),
manager, null, null, null, sspSet, new TaskInstanceExceptionHandler(taskInstanceMetrics,
- new scala.collection.immutable.HashSet<String>()), null, null, null);
+ new scala.collection.immutable.HashSet<String>()), null, null, null, new scala.collection.immutable.HashSet<>(), null);
}
interface TestCode {
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 4ff7848..1672191 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -47,7 +47,6 @@ import org.mockito.Mockito
import org.mockito.Mockito._
import org.scalatest.Assertions.intercept
-import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import scala.collection.JavaConverters._
@@ -377,7 +376,7 @@ class TestTaskInstance {
val mockOrder = inOrder(offsetManager, collector, storageManager)
val taskInstance: TaskInstance = new TaskInstance(
- Mockito.mock(classOf[StreamTask]).asInstanceOf[StreamTask],
+ Mockito.mock(classOf[StreamTask]),
taskName,
new MapConfig,
new TaskInstanceMetrics,
@@ -418,7 +417,7 @@ class TestTaskInstance {
val offsetManager = Mockito.mock(classOf[OffsetManager])
val taskInstance: TaskInstance = new TaskInstance(
- Mockito.mock(classOf[StreamTask]).asInstanceOf[StreamTask],
+ Mockito.mock(classOf[StreamTask]),
taskName,
new MapConfig,
new TaskInstanceMetrics,
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index 3bb4e99..0b945cb 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -151,7 +151,7 @@ class TestTaskStorageManager extends MockitoSugar {
val ss = new SystemStream("kafka", "testStream")
val partition = new Partition(0)
val ssp = new SystemStreamPartition(ss, partition)
- val storeDirectory = TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir, store, taskName)
+ val storeDirectory = TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName)
val mockStorageEngine: StorageEngine = createMockStorageEngine(isLoggedStore = true, isPersistedStore = false, null)
@@ -182,12 +182,12 @@ class TestTaskStorageManager extends MockitoSugar {
taskManager.init
// Verify that the store directory doesn't have ANY files
- assertNull(storeDirectory.listFiles())
+ assertTrue(storeDirectory.list().isEmpty)
verify(mockSystemConsumer).register(ssp, "0")
// Test 2: flush should NOT create/update the offset file. Store directory has no files
taskManager.flush()
- assertNull(storeDirectory.listFiles())
+ assertTrue(storeDirectory.list().isEmpty)
// Test 3: Update sspMetadata before shutdown and verify that offset file is NOT created
metadata = new SystemStreamMetadata("testStream", new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
@@ -197,7 +197,7 @@ class TestTaskStorageManager extends MockitoSugar {
})
when(mockStreamMetadataCache.getStreamMetadata(any(), any())).thenReturn(Map(ss -> metadata))
taskManager.stop()
- assertNull(storeDirectory.listFiles())
+ assertTrue(storeDirectory.list().isEmpty)
// Test 4: Initialize again with an updated sspMetadata; Verify that it restores from the earliest offset
metadata = new SystemStreamMetadata("testStream", new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
@@ -209,7 +209,7 @@ class TestTaskStorageManager extends MockitoSugar {
taskManager.init
- assertNull(storeDirectory.listFiles())
+ assertTrue(storeDirectory.list().isEmpty)
// second time to register; make sure it starts from beginning
verify(mockSystemConsumer, times(2)).register(ssp, "0")
}
@@ -223,7 +223,7 @@ class TestTaskStorageManager extends MockitoSugar {
val taskStorageManager = new TaskStorageManagerBuilder()
.addStore(store, false)
- .addStore(loggedStore, true)
+ .addLoggedStore(loggedStore, true)
.build
//Invoke test method
@@ -244,7 +244,7 @@ class TestTaskStorageManager extends MockitoSugar {
FileUtil.writeWithChecksum(offsetFilePath, "100")
val taskStorageManager = new TaskStorageManagerBuilder()
- .addStore(loggedStore, true)
+ .addLoggedStore(loggedStore, true)
.build
val cleanDirMethod = taskStorageManager.getClass.getDeclaredMethod("cleanBaseDirs",
@@ -266,7 +266,7 @@ class TestTaskStorageManager extends MockitoSugar {
FileUtil.writeWithChecksum(offsetFile, "Test Offset Data")
offsetFile.setLastModified(0)
val taskStorageManager = new TaskStorageManagerBuilder().addStore(store, false)
- .addStore(loggedStore, true)
+ .addLoggedStore(loggedStore, true)
.build
val cleanDirMethod = taskStorageManager.getClass
@@ -285,7 +285,7 @@ class TestTaskStorageManager extends MockitoSugar {
FileUtil.writeWithChecksum(offsetFilePath, "100")
val taskStorageManager = new TaskStorageManagerBuilder()
- .addStore(loggedStore, false)
+ .addLoggedStore(loggedStore, false)
.build
val cleanDirMethod = taskStorageManager.getClass.getDeclaredMethod("cleanBaseDirs",
@@ -647,6 +647,13 @@ class TaskStorageManagerBuilder extends MockitoSugar {
addStore(storeName, mockStorageEngine, mock[SystemConsumer])
}
+ def addLoggedStore(storeName: String, isPersistedToDisk: Boolean): TaskStorageManagerBuilder = {
+ val mockStorageEngine = mock[StorageEngine]
+ when(mockStorageEngine.getStoreProperties)
+ .thenReturn(new StorePropertiesBuilder().setPersistedToDisk(isPersistedToDisk).setLoggedStore(true).build())
+ addStore(storeName, mockStorageEngine, mock[SystemConsumer])
+ }
+
def setPartition(p: Partition) = {
partition = p
this
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java
index 2681fb3..2a9532b 100644
--- a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java
+++ b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java
@@ -50,7 +50,8 @@ public class InMemoryTableDescriptor<K, V> extends BaseLocalStoreBackedTableDesc
Map<String, String> tableSpecConfig = new HashMap<>();
generateTableSpecConfig(tableSpecConfig);
- return new TableSpec(tableId, serde, InMemoryTableProviderFactory.class.getName(), tableSpecConfig);
+ return new TableSpec(tableId, serde, InMemoryTableProviderFactory.class.getName(), tableSpecConfig,
+ sideInputs, sideInputsProcessor);
}
private void addInMemoryConfig(Map<String, String> map, String key, String value) {
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
index 2c62159..33bfc84 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
@@ -182,7 +182,8 @@ public class RocksDbTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescr
Map<String, String> tableSpecConfig = new HashMap<>();
generateTableSpecConfig(tableSpecConfig);
- return new TableSpec(tableId, serde, RocksDbTableProviderFactory.class.getName(), tableSpecConfig);
+ return new TableSpec(tableId, serde, RocksDbTableProviderFactory.class.getName(), tableSpecConfig,
+ sideInputs, sideInputsProcessor);
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
index 1f9b57b..2d05f95 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
@@ -18,9 +18,13 @@
*/
package org.apache.samza.storage.kv;
+import com.google.common.base.Preconditions;
+
+import java.util.List;
import java.util.Map;
import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.storage.SideInputsProcessor;
/**
@@ -32,6 +36,8 @@ import org.apache.samza.operators.BaseTableDescriptor;
*/
abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLocalStoreBackedTableDescriptor<K, V, D>>
extends BaseTableDescriptor<K, V, D> {
+ protected List<String> sideInputs;
+ protected SideInputsProcessor sideInputsProcessor;
/**
* Constructs a table descriptor instance
@@ -41,6 +47,16 @@ abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLo
super(tableId);
}
+ public D withSideInputs(List<String> sideInputs) {
+ this.sideInputs = sideInputs;
+ return (D) this;
+ }
+
+ public D withSideInputsProcessor(SideInputsProcessor sideInputsProcessor) {
+ this.sideInputsProcessor = sideInputsProcessor;
+ return (D) this;
+ }
+
@Override
protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
super.generateTableSpecConfig(tableSpecConfig);
@@ -51,6 +67,11 @@ abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLo
*/
protected void validate() {
super.validate();
+ if (sideInputs != null || sideInputsProcessor != null) {
+ Preconditions.checkArgument(sideInputs != null && !sideInputs.isEmpty() && sideInputsProcessor != null,
+ String.format("Invalid side input configuration for table: %s. " +
+ "Both side inputs and the processor must be provided", tableId));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
index b494eba..cacfe95 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
@@ -19,9 +19,11 @@
package org.apache.samza.storage.kv;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.samza.SamzaException;
+import org.apache.samza.config.JavaStorageConfig;
import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StorageConfig;
@@ -30,6 +32,7 @@ import org.apache.samza.table.ReadableTable;
import org.apache.samza.table.Table;
import org.apache.samza.table.TableProvider;
import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.utils.SerdeUtils;
import org.apache.samza.task.TaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,6 +105,15 @@ abstract public class BaseLocalStoreBackedTableProvider implements TableProvider
String valueSerde = tableConfig.getValueSerde(tableSpec.getId());
storeConfig.put(String.format(StorageConfig.MSG_SERDE(), tableSpec.getId()), valueSerde);
+ List<String> sideInputs = tableSpec.getSideInputs();
+ if (sideInputs != null && !sideInputs.isEmpty()) {
+ String formattedSideInputs = String.join(",", sideInputs);
+
+ storeConfig.put(String.format(JavaStorageConfig.SIDE_INPUTS, tableSpec.getId()), formattedSideInputs);
+ storeConfig.put(String.format(JavaStorageConfig.SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE, tableSpec.getId()),
+ SerdeUtils.serialize("Side Inputs Processor", tableSpec.getSideInputsProcessor()));
+ }
+
return storeConfig;
}