You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@bahir.apache.org by GitBox <gi...@apache.org> on 2021/03/11 11:07:29 UTC

[GitHub] [bahir-flink] eskabetxe commented on a change in pull request #113: Apache Pinot Connector Sink

eskabetxe commented on a change in pull request #113:
URL: https://github.com/apache/bahir-flink/pull/113#discussion_r592227559



##########
File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotControllerApi.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException;
+import org.apache.http.HttpEntity;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.*;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Helpers to interact with the Pinot controller via its public API.
+ */
+public class PinotControllerApi {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PinotControllerApi.class);
+    protected final String controllerHost;
+    protected final String controllerHostPort;
+
+    /**
+     * @param controllerHost Pinot controller's host
+     * @param controllerPort Pinot controller's port
+     */
+    public PinotControllerApi(String controllerHost, String controllerPort) {
+        this.controllerHost = checkNotNull(controllerHost);

Review comment:
       this could be removed, its only used to generate the controolerHostPort no?

##########
File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.committer;
+
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.streaming.connectors.pinot.PinotControllerApi;
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.tools.admin.command.UploadSegmentCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.*;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Global committer takes committables from {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter},
+ * generates segments and pushed them to the Pinot controller.
+ * Note: We use a custom multithreading approach to parallelize the segment creation and upload to
+ * overcome the performance limitations resulting from using a {@link GlobalCommitter} always
+ * running at a parallelism of 1.
+ */
+public class PinotSinkGlobalCommitter implements GlobalCommitter<PinotSinkCommittable, PinotSinkGlobalCommittable> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PinotSinkGlobalCommitter.class);
+
+    private final String pinotControllerHost;
+    private final String pinotControllerPort;
+    private final String tableName;
+    private final SegmentNameGenerator segmentNameGenerator;
+    private final String tempDirPrefix;
+    private final FileSystemAdapter fsAdapter;
+    private final String timeColumnName;
+    private final TimeUnit segmentTimeUnit;
+
+    /**
+     * @param pinotControllerHost  Host of the Pinot controller
+     * @param pinotControllerPort  Port of the Pinot controller
+     * @param tableName            Target table's name
+     * @param segmentNameGenerator Pinot segment name generator
+     * @param fsAdapter            Adapter for interacting with the shared file system
+     * @param timeColumnName       Name of the column containing the timestamp
+     * @param segmentTimeUnit      Unit of the time column
+     */
+    public PinotSinkGlobalCommitter(String pinotControllerHost, String pinotControllerPort, String tableName, SegmentNameGenerator segmentNameGenerator, String tempDirPrefix, FileSystemAdapter fsAdapter, String timeColumnName, TimeUnit segmentTimeUnit) {
+        this.pinotControllerHost = checkNotNull(pinotControllerHost);
+        this.pinotControllerPort = checkNotNull(pinotControllerPort);
+        this.tableName = checkNotNull(tableName);
+        this.segmentNameGenerator = checkNotNull(segmentNameGenerator);
+        this.tempDirPrefix = checkNotNull(tempDirPrefix);
+        this.fsAdapter = checkNotNull(fsAdapter);
+        this.timeColumnName = checkNotNull(timeColumnName);
+        this.segmentTimeUnit = checkNotNull(segmentTimeUnit);
+    }
+
+    /**
+     * Identifies global committables that need to be re-committed from a list of recovered committables.
+     *
+     * @param globalCommittables List of global committables that are checked for required re-commit
+     * @return List of global committable that need to be re-committed
+     * @throws IOException
+     */
+    @Override
+    public List<PinotSinkGlobalCommittable> filterRecoveredCommittables(List<PinotSinkGlobalCommittable> globalCommittables) throws IOException {
+        PinotControllerApi controllerApi = new PinotControllerApi(this.pinotControllerHost, this.pinotControllerPort);
+        List<PinotSinkGlobalCommittable> committablesToRetry = new ArrayList<>();
+
+        for (PinotSinkGlobalCommittable globalCommittable : globalCommittables) {
+            CommitStatus commitStatus = this.getCommitStatus(globalCommittable);
+
+            if (commitStatus.getMissingSegmentNames().isEmpty()) {
+                // All segments were already committed. Thus, we do not need to retry the commit.
+                continue;
+            }
+
+            for (String existingSegment : commitStatus.getExistingSegmentNames()) {
+                // Some but not all segments were already committed. As we cannot assure the data
+                // files containing the same data as originally when recovering from failure,
+                // we delete the already committed segments in order to recommit them later on.
+                controllerApi.deleteSegment(tableName, existingSegment);
+            }
+            committablesToRetry.add(globalCommittable);
+        }
+
+        return committablesToRetry;
+    }
+
+    /**
+     * Combines multiple {@link PinotSinkCommittable}s into one {@link PinotSinkGlobalCommittable}
+     * by finding the minimum and maximum timestamps from the provided {@link PinotSinkCommittable}s.
+     *
+     * @param committables Committables created by {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter}
+     * @return Global committer committable
+     */
+    @Override
+    public PinotSinkGlobalCommittable combine(List<PinotSinkCommittable> committables) {
+        List<String> dataFilePaths = new ArrayList<>();
+        long minTimestamp = Long.MAX_VALUE;
+        long maxTimestamp = Long.MIN_VALUE;
+
+        // Extract all data file paths and the overall minimum and maximum timestamps
+        // from all committables
+        for (PinotSinkCommittable committable : committables) {
+            dataFilePaths.add(committable.getDataFilePath());
+            minTimestamp = Long.min(minTimestamp, committable.getMinTimestamp());
+            maxTimestamp = Long.max(maxTimestamp, committable.getMaxTimestamp());
+        }
+
+        LOG.info("Combined {} committables into one global committable", committables.size());
+        return new PinotSinkGlobalCommittable(dataFilePaths, minTimestamp, maxTimestamp);
+    }
+
+    /**
+     * Copies data files from shared filesystem to the local filesystem, generates segments with names
+     * according to the segment naming schema and finally pushes the segments to the Pinot cluster.
+     * Before pushing a segment it is checked whether there already exists a segment with that name
+     * in the Pinot cluster by calling the Pinot controller. In case there is one, it gets deleted.
+     *
+     * @param globalCommittables List of global committables
+     * @return Global committables whose commit failed
+     * @throws IOException
+     */
+    @Override
+    public List<PinotSinkGlobalCommittable> commit(List<PinotSinkGlobalCommittable> globalCommittables) throws IOException {
+        // Retrieve the Pinot table schema and the Pinot table config from the Pinot controller
+        PinotControllerApi controllerApi = new PinotControllerApi(this.pinotControllerHost, this.pinotControllerPort);
+        Schema tableSchema = controllerApi.getSchema(this.tableName);
+        TableConfig tableConfig = controllerApi.getTableConfig(this.tableName);
+
+        // List of failed global committables that can be retried later on
+        List<PinotSinkGlobalCommittable> failedCommits = new ArrayList<>();
+
+        for (PinotSinkGlobalCommittable globalCommittable : globalCommittables) {
+            // Make sure to remove all previously committed segments in globalCommittable
+            // when recovering from failure
+            CommitStatus commitStatus = this.getCommitStatus(globalCommittable);
+            for (String existingSegment : commitStatus.getExistingSegmentNames()) {
+                // Some but not all segments were already committed. As we cannot assure the data
+                // files containing the same data as originally when recovering from failure,
+                // we delete the already committed segments in order to recommit them later on.
+                controllerApi.deleteSegment(tableName, existingSegment);
+            }
+
+            // We use a thread pool in order to parallelize the segment creation and segment upload
+            ExecutorService pool = Executors.newCachedThreadPool();
+            Set<Future<Boolean>> resultFutures = new HashSet<>();
+
+            // Commit all segments in globalCommittable
+            int sequenceId = 0;
+            for (String dataFilePath : globalCommittable.getDataFilePaths()) {
+                // Get segment names with increasing sequenceIds
+                String segmentName = this.getSegmentName(globalCommittable, sequenceId++);
+                // Segment committer handling the whole commit process for a single segment
+                Callable<Boolean> segmentCommitter = new SegmentCommitter(
+                        this.pinotControllerHost, this.pinotControllerPort, this.tempDirPrefix,
+                        this.fsAdapter, dataFilePath, segmentName, tableSchema, tableConfig,
+                        this.timeColumnName, this.segmentTimeUnit
+                );
+                // Submits the segment committer to the thread pool
+                resultFutures.add(pool.submit(segmentCommitter));
+            }
+
+            try {
+                for (Future<Boolean> wasSuccessful : resultFutures) {
+                    // In case any of the segment commits wasn't successful we mark the whole
+                    // globalCommittable as failed
+                    if (!wasSuccessful.get()) {
+                        failedCommits.add(globalCommittable);
+                        // Once any of the commits failed, we do not need to check the remaining
+                        // ones, as we try to commit the globalCommittable next time
+                        break;
+                    }
+                }
+            } catch (Exception e) {
+                // In case of an exception mark the whole globalCommittable as failed
+                failedCommits.add(globalCommittable);
+                LOG.error(e.getMessage());
+                e.printStackTrace();
+            }
+        }
+
+        // Return failed commits so that they can be retried later on
+        return failedCommits;
+    }
+
+    /**
+     * Empty method.
+     */
+    @Override
+    public void endOfInput() {
+    }
+
+    /**
+     * Empty method, as we do not open any connections.
+     */
+    @Override
+    public void close() {
+    }
+
+    /**
+     * Helper method for generating segment names using the segment name generator.
+     *
+     * @param globalCommittable Global committable the segment name shall be generated from
+     * @param sequenceId        Incrementing counter
+     * @return generated segment name
+     */
+    private String getSegmentName(PinotSinkGlobalCommittable globalCommittable, int sequenceId) {
+        return this.segmentNameGenerator.generateSegmentName(sequenceId, globalCommittable.getMinTimestamp(), globalCommittable.getMaxTimestamp());
+    }
+
+    /**
+     * Evaluates the status of already uploaded segments by requesting segment metadata from the
+     * Pinot controller.
+     *
+     * @param globalCommittable Global committable whose commit status gets evaluated
+     * @return Commit status
+     * @throws IOException
+     */
+    private CommitStatus getCommitStatus(PinotSinkGlobalCommittable globalCommittable) throws IOException {
+        PinotControllerApi controllerApi = new PinotControllerApi(this.pinotControllerHost, this.pinotControllerPort);
+
+        List<String> existingSegmentNames = new ArrayList<>();
+        List<String> missingSegmentNames = new ArrayList<>();
+
+        // For all segment names that will be used to submit new segments, check whether the segment
+        // name already exists for the target table
+        for (int sequenceId = 0; sequenceId < globalCommittable.getDataFilePaths().size(); sequenceId++) {
+            String segmentName = this.getSegmentName(globalCommittable, sequenceId);
+            if (controllerApi.tableHasSegment(this.tableName, segmentName)) {
+                // Segment name already exists
+                existingSegmentNames.add(segmentName);
+            } else {
+                // Segment name does not exist yet
+                missingSegmentNames.add(segmentName);
+            }
+        }
+
+        return new CommitStatus(existingSegmentNames, missingSegmentNames);
+    }
+
+    /**
+     * Wrapper for existing and missing segments in the Pinot cluster.
+     */
+    static class CommitStatus {
+        private final List<String> existingSegmentNames;
+        private final List<String> missingSegmentNames;
+
+        public CommitStatus(List<String> existingSegmentNames, List<String> missingSegmentNames) {
+            this.existingSegmentNames = existingSegmentNames;
+            this.missingSegmentNames = missingSegmentNames;
+        }
+
+        public List<String> getExistingSegmentNames() {
+            return existingSegmentNames;
+        }
+
+        public List<String> getMissingSegmentNames() {
+            return missingSegmentNames;
+        }
+    }
+
+    /**
+     * Helper class for committing a single segment. Downloads a data file from the shared filesystem,
+     * generates a segment from the data file and uploads segment to the Pinot controller.
+     */
+    static class SegmentCommitter implements Callable<Boolean> {
+
+        private static final Logger LOG = LoggerFactory.getLogger(SegmentCommitter.class);
+
+        final String pinotControllerHost;
+        final String pinotControllerPort;
+        final String tempDirPrefix;
+        final FileSystemAdapter fsAdapter;
+        final String dataFilePath;
+        final String segmentName;
+        final Schema tableSchema;
+        final TableConfig tableConfig;
+        final String timeColumnName;
+        final TimeUnit segmentTimeUnit;
+
+        /**
+         * @param pinotControllerHost Host of the Pinot controller
+         * @param pinotControllerPort Port of the Pinot controller
+         * @param fsAdapter           Filesystem adapter used to load data files from the shared file system
+         * @param dataFilePath        Data file to load from the shared file system
+         * @param segmentName         Name of the segment to create and commit
+         * @param tableSchema         Pinot table schema
+         * @param tableConfig         Pinot table config
+         * @param timeColumnName      Name of the column containing the timestamp
+         * @param segmentTimeUnit     Unit of the time column
+         */
+        public SegmentCommitter(String pinotControllerHost, String pinotControllerPort, String tempDirPrefix, FileSystemAdapter fsAdapter, String dataFilePath, String segmentName, Schema tableSchema, TableConfig tableConfig, String timeColumnName, TimeUnit segmentTimeUnit) {
+            this.pinotControllerHost = pinotControllerHost;
+            this.pinotControllerPort = pinotControllerPort;
+            this.tempDirPrefix = tempDirPrefix;
+            this.fsAdapter = fsAdapter;
+            this.dataFilePath = dataFilePath;
+            this.segmentName = segmentName;
+            this.tableSchema = tableSchema;
+            this.tableConfig = tableConfig;
+            this.timeColumnName = timeColumnName;
+            this.segmentTimeUnit = segmentTimeUnit;
+        }
+
+        /**
+         * Downloads a segment from the shared file system via {@code fsAdapter}, generates a segment
+         * and finally uploads the segment to the Pinot controller
+         *
+         * @return True if the commit succeeded
+         */
+        @Override
+        public Boolean call() {
+            try {
+                // Download data file from the shared filesystem
+                LOG.info("Downloading data file {} from shared file system...", dataFilePath);
+                File segmentData = fsAdapter.copyToLocalFile(dataFilePath);
+                LOG.info("Successfully downloaded data file {} from shared file system", dataFilePath);
+
+                File segmentFile = Files.createTempDirectory(this.tempDirPrefix).toFile();
+                LOG.info("Creating segment in " + segmentFile.getAbsolutePath());
+
+                // Creates a segment with name `segmentName` in `segmentFile`
+                this.generateSegment(segmentData, segmentFile, true);
+
+                // Uploads the recently created segment to the Pinot controller
+                this.uploadSegment(segmentFile);
+
+                // Commit successful
+                return true;
+            } catch (IOException e) {
+                e.printStackTrace();
+                LOG.error(e.getMessage());
+
+                // Commit failed
+                return false;
+            }
+        }
+
+        /**
+         * Creates a segment from the given parameters.
+         * This method was adapted from {@link org.apache.pinot.tools.admin.command.CreateSegmentCommand}.
+         *
+         * @param dataFile                  File containing the JSON data
+         * @param outDir                    Segment target path
+         * @param _postCreationVerification Verify segment after generation
+         */
+        public void generateSegment(File dataFile, File outDir, Boolean _postCreationVerification) {
+            SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, tableSchema);
+            segmentGeneratorConfig.setSegmentName(segmentName);
+            segmentGeneratorConfig.setSegmentTimeUnit(segmentTimeUnit);
+            segmentGeneratorConfig.setTimeColumnName(timeColumnName);
+            segmentGeneratorConfig.setInputFilePath(dataFile.getPath());
+            segmentGeneratorConfig.setFormat(FileFormat.JSON);
+            segmentGeneratorConfig.setOutDir(outDir.getPath());
+            segmentGeneratorConfig.setTableName(tableConfig.getTableName());
+
+            try {
+                SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+                driver.init(segmentGeneratorConfig);
+                driver.build();
+                File indexDir = new File(outDir, segmentName);
+                LOG.info("Successfully created segment: {} in directory: {}", segmentName, indexDir);
+                if (_postCreationVerification) {
+                    LOG.info("Verifying the segment by loading it");
+                    ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, ReadMode.mmap);
+                    LOG.info("Successfully loaded segment: {} of size: {} bytes", segmentName,
+                            segment.getSegmentSizeBytes());
+                    segment.destroy();
+                }
+            } catch (Exception e) {
+                e.printStackTrace();

Review comment:
       log this, or remove

##########
File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotControllerApi.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException;
+import org.apache.http.HttpEntity;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.*;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Helpers to interact with the Pinot controller via its public API.
+ */
+public class PinotControllerApi {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PinotControllerApi.class);
+    protected final String controllerHost;
+    protected final String controllerHostPort;
+
+    /**
+     * @param controllerHost Pinot controller's host
+     * @param controllerPort Pinot controller's port
+     */
+    public PinotControllerApi(String controllerHost, String controllerPort) {
+        this.controllerHost = checkNotNull(controllerHost);
+        checkNotNull(controllerPort);
+        this.controllerHostPort = String.format("http://%s:%s", controllerHost, controllerPort);
+    }
+
+    /**
+     * Issues a request to the Pinot controller API.
+     *
+     * @param request Request to issue
+     * @return Api response
+     * @throws IOException
+     */
+    private ApiResponse execute(HttpRequestBase request) throws IOException {
+        ApiResponse result;
+
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();
+             CloseableHttpResponse response = httpClient.execute(request)) {
+
+
+            String body = EntityUtils.toString(response.getEntity());

Review comment:
       if fail to connect to pinot this will fail no?

##########
File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.committer;
+
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.streaming.connectors.pinot.PinotControllerApi;
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.tools.admin.command.UploadSegmentCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.*;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Global committer takes committables from {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter},
+ * generates segments and pushed them to the Pinot controller.
+ * Note: We use a custom multithreading approach to parallelize the segment creation and upload to
+ * overcome the performance limitations resulting from using a {@link GlobalCommitter} always
+ * running at a parallelism of 1.
+ */
+public class PinotSinkGlobalCommitter implements GlobalCommitter<PinotSinkCommittable, PinotSinkGlobalCommittable> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PinotSinkGlobalCommitter.class);
+
+    private final String pinotControllerHost;
+    private final String pinotControllerPort;
+    private final String tableName;
+    private final SegmentNameGenerator segmentNameGenerator;
+    private final String tempDirPrefix;
+    private final FileSystemAdapter fsAdapter;
+    private final String timeColumnName;
+    private final TimeUnit segmentTimeUnit;
+
+    /**
+     * @param pinotControllerHost  Host of the Pinot controller
+     * @param pinotControllerPort  Port of the Pinot controller
+     * @param tableName            Target table's name
+     * @param segmentNameGenerator Pinot segment name generator
+     * @param fsAdapter            Adapter for interacting with the shared file system
+     * @param timeColumnName       Name of the column containing the timestamp
+     * @param segmentTimeUnit      Unit of the time column
+     */
+    public PinotSinkGlobalCommitter(String pinotControllerHost, String pinotControllerPort, String tableName, SegmentNameGenerator segmentNameGenerator, String tempDirPrefix, FileSystemAdapter fsAdapter, String timeColumnName, TimeUnit segmentTimeUnit) {
+        this.pinotControllerHost = checkNotNull(pinotControllerHost);
+        this.pinotControllerPort = checkNotNull(pinotControllerPort);
+        this.tableName = checkNotNull(tableName);
+        this.segmentNameGenerator = checkNotNull(segmentNameGenerator);
+        this.tempDirPrefix = checkNotNull(tempDirPrefix);
+        this.fsAdapter = checkNotNull(fsAdapter);
+        this.timeColumnName = checkNotNull(timeColumnName);
+        this.segmentTimeUnit = checkNotNull(segmentTimeUnit);
+    }
+
+    /**
+     * Identifies global committables that need to be re-committed from a list of recovered committables.
+     *
+     * @param globalCommittables List of global committables that are checked for required re-commit
+     * @return List of global committable that need to be re-committed
+     * @throws IOException
+     */
+    @Override
+    public List<PinotSinkGlobalCommittable> filterRecoveredCommittables(List<PinotSinkGlobalCommittable> globalCommittables) throws IOException {
+        PinotControllerApi controllerApi = new PinotControllerApi(this.pinotControllerHost, this.pinotControllerPort);
+        List<PinotSinkGlobalCommittable> committablesToRetry = new ArrayList<>();
+
+        for (PinotSinkGlobalCommittable globalCommittable : globalCommittables) {
+            CommitStatus commitStatus = this.getCommitStatus(globalCommittable);
+
+            if (commitStatus.getMissingSegmentNames().isEmpty()) {
+                // All segments were already committed. Thus, we do not need to retry the commit.
+                continue;
+            }
+
+            for (String existingSegment : commitStatus.getExistingSegmentNames()) {
+                // Some but not all segments were already committed. As we cannot assure the data
+                // files containing the same data as originally when recovering from failure,
+                // we delete the already committed segments in order to recommit them later on.
+                controllerApi.deleteSegment(tableName, existingSegment);
+            }
+            committablesToRetry.add(globalCommittable);
+        }
+
+        return committablesToRetry;
+    }
+
+    /**
+     * Combines multiple {@link PinotSinkCommittable}s into one {@link PinotSinkGlobalCommittable}
+     * by finding the minimum and maximum timestamps from the provided {@link PinotSinkCommittable}s.
+     *
+     * @param committables Committables created by {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter}
+     * @return Global committer committable
+     */
+    @Override
+    public PinotSinkGlobalCommittable combine(List<PinotSinkCommittable> committables) {
+        List<String> dataFilePaths = new ArrayList<>();
+        long minTimestamp = Long.MAX_VALUE;
+        long maxTimestamp = Long.MIN_VALUE;
+
+        // Extract all data file paths and the overall minimum and maximum timestamps
+        // from all committables
+        for (PinotSinkCommittable committable : committables) {
+            dataFilePaths.add(committable.getDataFilePath());
+            minTimestamp = Long.min(minTimestamp, committable.getMinTimestamp());
+            maxTimestamp = Long.max(maxTimestamp, committable.getMaxTimestamp());
+        }
+
+        LOG.info("Combined {} committables into one global committable", committables.size());
+        return new PinotSinkGlobalCommittable(dataFilePaths, minTimestamp, maxTimestamp);
+    }
+
+    /**
+     * Copies data files from shared filesystem to the local filesystem, generates segments with names
+     * according to the segment naming schema and finally pushes the segments to the Pinot cluster.
+     * Before pushing a segment it is checked whether there already exists a segment with that name
+     * in the Pinot cluster by calling the Pinot controller. In case there is one, it gets deleted.
+     *
+     * @param globalCommittables List of global committables
+     * @return Global committables whose commit failed
+     * @throws IOException
+     */
+    @Override
+    public List<PinotSinkGlobalCommittable> commit(List<PinotSinkGlobalCommittable> globalCommittables) throws IOException {
+        // Retrieve the Pinot table schema and the Pinot table config from the Pinot controller
+        PinotControllerApi controllerApi = new PinotControllerApi(this.pinotControllerHost, this.pinotControllerPort);
+        Schema tableSchema = controllerApi.getSchema(this.tableName);
+        TableConfig tableConfig = controllerApi.getTableConfig(this.tableName);
+
+        // List of failed global committables that can be retried later on
+        List<PinotSinkGlobalCommittable> failedCommits = new ArrayList<>();
+
+        for (PinotSinkGlobalCommittable globalCommittable : globalCommittables) {
+            // Make sure to remove all previously committed segments in globalCommittable
+            // when recovering from failure
+            CommitStatus commitStatus = this.getCommitStatus(globalCommittable);
+            for (String existingSegment : commitStatus.getExistingSegmentNames()) {
+                // Some but not all segments were already committed. As we cannot assure the data
+                // files containing the same data as originally when recovering from failure,
+                // we delete the already committed segments in order to recommit them later on.
+                controllerApi.deleteSegment(tableName, existingSegment);
+            }
+
+            // We use a thread pool in order to parallelize the segment creation and segment upload
+            ExecutorService pool = Executors.newCachedThreadPool();
+            Set<Future<Boolean>> resultFutures = new HashSet<>();
+
+            // Commit all segments in globalCommittable
+            int sequenceId = 0;
+            for (String dataFilePath : globalCommittable.getDataFilePaths()) {
+                // Get segment names with increasing sequenceIds
+                String segmentName = this.getSegmentName(globalCommittable, sequenceId++);
+                // Segment committer handling the whole commit process for a single segment
+                Callable<Boolean> segmentCommitter = new SegmentCommitter(
+                        this.pinotControllerHost, this.pinotControllerPort, this.tempDirPrefix,
+                        this.fsAdapter, dataFilePath, segmentName, tableSchema, tableConfig,
+                        this.timeColumnName, this.segmentTimeUnit
+                );
+                // Submits the segment committer to the thread pool
+                resultFutures.add(pool.submit(segmentCommitter));
+            }
+
+            try {
+                for (Future<Boolean> wasSuccessful : resultFutures) {
+                    // In case any of the segment commits wasn't successful we mark the whole
+                    // globalCommittable as failed
+                    if (!wasSuccessful.get()) {
+                        failedCommits.add(globalCommittable);
+                        // Once any of the commits failed, we do not need to check the remaining
+                        // ones, as we try to commit the globalCommittable next time
+                        break;
+                    }
+                }
+            } catch (Exception e) {
+                // In case of an exception mark the whole globalCommittable as failed
+                failedCommits.add(globalCommittable);
+                LOG.error(e.getMessage());
+                e.printStackTrace();
+            }
+        }
+
+        // Return failed commits so that they can be retried later on
+        return failedCommits;
+    }
+
+    /**
+     * Empty method.
+     */
+    @Override
+    public void endOfInput() {
+    }
+
+    /**
+     * Empty method, as we do not open any connections.
+     */
+    @Override
+    public void close() {
+    }
+
+    /**
+     * Helper method for generating segment names using the segment name generator.
+     *
+     * @param globalCommittable Global committable the segment name shall be generated from
+     * @param sequenceId        Incrementing counter
+     * @return generated segment name
+     */
+    private String getSegmentName(PinotSinkGlobalCommittable globalCommittable, int sequenceId) {
+        return this.segmentNameGenerator.generateSegmentName(sequenceId, globalCommittable.getMinTimestamp(), globalCommittable.getMaxTimestamp());
+    }
+
+    /**
+     * Evaluates the status of already uploaded segments by requesting segment metadata from the
+     * Pinot controller.
+     *
+     * @param globalCommittable Global committable whose commit status gets evaluated
+     * @return Commit status
+     * @throws IOException
+     */
+    private CommitStatus getCommitStatus(PinotSinkGlobalCommittable globalCommittable) throws IOException {
+        PinotControllerApi controllerApi = new PinotControllerApi(this.pinotControllerHost, this.pinotControllerPort);
+
+        List<String> existingSegmentNames = new ArrayList<>();
+        List<String> missingSegmentNames = new ArrayList<>();
+
+        // For all segment names that will be used to submit new segments, check whether the segment
+        // name already exists for the target table
+        for (int sequenceId = 0; sequenceId < globalCommittable.getDataFilePaths().size(); sequenceId++) {
+            String segmentName = this.getSegmentName(globalCommittable, sequenceId);
+            if (controllerApi.tableHasSegment(this.tableName, segmentName)) {
+                // Segment name already exists
+                existingSegmentNames.add(segmentName);
+            } else {
+                // Segment name does not exist yet
+                missingSegmentNames.add(segmentName);
+            }
+        }
+
+        return new CommitStatus(existingSegmentNames, missingSegmentNames);
+    }
+
+    /**
+     * Wrapper for existing and missing segments in the Pinot cluster.
+     */
+    static class CommitStatus {
+        private final List<String> existingSegmentNames;
+        private final List<String> missingSegmentNames;
+
+        public CommitStatus(List<String> existingSegmentNames, List<String> missingSegmentNames) {
+            this.existingSegmentNames = existingSegmentNames;
+            this.missingSegmentNames = missingSegmentNames;
+        }
+
+        public List<String> getExistingSegmentNames() {
+            return existingSegmentNames;
+        }
+
+        public List<String> getMissingSegmentNames() {
+            return missingSegmentNames;
+        }
+    }
+
+    /**
+     * Helper class for committing a single segment. Downloads a data file from the shared filesystem,
+     * generates a segment from the data file and uploads segment to the Pinot controller.
+     */
+    static class SegmentCommitter implements Callable<Boolean> {
+
+        private static final Logger LOG = LoggerFactory.getLogger(SegmentCommitter.class);
+
+        final String pinotControllerHost;
+        final String pinotControllerPort;
+        final String tempDirPrefix;
+        final FileSystemAdapter fsAdapter;
+        final String dataFilePath;
+        final String segmentName;
+        final Schema tableSchema;
+        final TableConfig tableConfig;
+        final String timeColumnName;
+        final TimeUnit segmentTimeUnit;
+
+        /**
+         * @param pinotControllerHost Host of the Pinot controller
+         * @param pinotControllerPort Port of the Pinot controller
+         * @param fsAdapter           Filesystem adapter used to load data files from the shared file system
+         * @param dataFilePath        Data file to load from the shared file system
+         * @param segmentName         Name of the segment to create and commit
+         * @param tableSchema         Pinot table schema
+         * @param tableConfig         Pinot table config
+         * @param timeColumnName      Name of the column containing the timestamp
+         * @param segmentTimeUnit     Unit of the time column
+         */
+        public SegmentCommitter(String pinotControllerHost, String pinotControllerPort, String tempDirPrefix, FileSystemAdapter fsAdapter, String dataFilePath, String segmentName, Schema tableSchema, TableConfig tableConfig, String timeColumnName, TimeUnit segmentTimeUnit) {
+            this.pinotControllerHost = pinotControllerHost;
+            this.pinotControllerPort = pinotControllerPort;
+            this.tempDirPrefix = tempDirPrefix;
+            this.fsAdapter = fsAdapter;
+            this.dataFilePath = dataFilePath;
+            this.segmentName = segmentName;
+            this.tableSchema = tableSchema;
+            this.tableConfig = tableConfig;
+            this.timeColumnName = timeColumnName;
+            this.segmentTimeUnit = segmentTimeUnit;
+        }
+
+        /**
+         * Downloads a segment from the shared file system via {@code fsAdapter}, generates a segment
+         * and finally uploads the segment to the Pinot controller
+         *
+         * @return True if the commit succeeded
+         */
+        @Override
+        public Boolean call() {
+            try {
+                // Download data file from the shared filesystem
+                LOG.info("Downloading data file {} from shared file system...", dataFilePath);
+                File segmentData = fsAdapter.copyToLocalFile(dataFilePath);
+                LOG.info("Successfully downloaded data file {} from shared file system", dataFilePath);
+
+                File segmentFile = Files.createTempDirectory(this.tempDirPrefix).toFile();
+                LOG.info("Creating segment in " + segmentFile.getAbsolutePath());
+
+                // Creates a segment with name `segmentName` in `segmentFile`
+                this.generateSegment(segmentData, segmentFile, true);
+
+                // Uploads the recently created segment to the Pinot controller
+                this.uploadSegment(segmentFile);
+
+                // Commit successful
+                return true;
+            } catch (IOException e) {
+                e.printStackTrace();

Review comment:
       set this to log

##########
File path: flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/emulator/PinotEmulatorManager.java
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.emulator;
+
+import com.github.dockerjava.api.DockerClient;

Review comment:
       could you use testcontainers?

##########
File path: flink-connector-pinot/pom.xml
##########
@@ -0,0 +1,213 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>bahir-flink-parent_2.11</artifactId>
+        <version>1.1-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-connector-pinot_2.11</artifactId>
+    <name>flink-connector-pinot</name>
+
+
+    <packaging>jar</packaging>
+
+    <!-- Allow users to pass custom connector versions -->
+    <properties>
+        <pinot.version>0.6.0</pinot.version>
+
+        <!-- Flink version -->
+        <flink.version>1.12.0</flink.version>

Review comment:
       we should update the flink version on parent
   

##########
File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotWriterSegment.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.writer;
+
+import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link PinotWriterSegment} represents exactly one segment that can be found in the Pinot
+ * cluster once the commit has been completed.
+ *
+ * @param <IN> Type of incoming elements
+ */
+public class PinotWriterSegment<IN> implements Serializable {
+
+    private static final Logger LOG = LoggerFactory.getLogger("PinotWriterSegment");
+
+    private final int maxRowsPerSegment;
+    private final String tempDirPrefix;
+    private final JsonSerializer<IN> jsonSerializer;
+    private final FileSystemAdapter fsAdapter;
+
+    private boolean acceptsElements = true;
+
+    private final List<IN> elements;
+    private File dataFile;
+    private long minTimestamp = Long.MAX_VALUE;
+    private long maxTimestamp = Long.MIN_VALUE;
+
+    /**
+     * @param maxRowsPerSegment Maximum number of rows to be stored within a Pinot segment
+     * @param tempDirPrefix     Prefix for temp directories used
+     * @param jsonSerializer    Serializer used to convert elements to JSON
+     * @param fsAdapter         Filesystem adapter used to save files for sharing files across nodes
+     */
+    protected PinotWriterSegment(int maxRowsPerSegment, String tempDirPrefix, JsonSerializer<IN> jsonSerializer, FileSystemAdapter fsAdapter) {
+        checkArgument(maxRowsPerSegment > 0L);
+        this.maxRowsPerSegment = maxRowsPerSegment;
+        this.tempDirPrefix = checkNotNull(tempDirPrefix);
+        this.jsonSerializer = checkNotNull(jsonSerializer);
+        this.fsAdapter = checkNotNull(fsAdapter);
+        this.elements = new ArrayList<>();
+    }
+
+    /**
+     * Takes elements and stores them in memory until either {@link #maxRowsPerSegment} is reached
+     * or {@link #prepareCommit} is called.
+     *
+     * @param element   Object from upstream task
+     * @param timestamp Timestamp assigned to element
+     * @throws IOException
+     */
+    public void write(IN element, long timestamp) throws IOException {
+        if (!this.acceptsElements()) {
+            throw new IllegalStateException("This PinotSegmentWriter does not accept any elements anymore.");
+        }
+        this.elements.add(element);
+        this.minTimestamp = Long.min(this.minTimestamp, timestamp);
+        this.maxTimestamp = Long.max(this.maxTimestamp, timestamp);
+
+        // Writes elements to local filesystem once the maximum number of items is reached
+        if (this.elements.size() == this.maxRowsPerSegment) {

Review comment:
       why we write to local?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org