You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ap...@apache.org on 2020/07/24 18:47:36 UTC

[incubator-pinot] branch thirdeye-temp updated (69b9bc4 -> abd7de7)

This is an automated email from the ASF dual-hosted git repository.

apucher pushed a change to branch thirdeye-temp
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


 discard 69b9bc4  [Deepstore by-passing]Introduce a subclasses SplitSegmentCommitter which will proceeds to commit even if the segment upload fails. (#5700)
 discard 0a82d29  request log
 discard 561a819  external logger settings in docker configs
 discard f9b248d  add log conf to Dockerfile launch script
     add 0caec1f  remove dependencies of PinotThirdEyeDataSource.DATA_SOURCE_NAME
     new be30a9f  add log conf to Dockerfile launch script
     new 0690039  external logger settings in docker configs
     new 6b57986  request log
     new abd7de7  [Deepstore by-passing]Introduce a subclasses SplitSegmentCommitter which will proceeds to commit even if the segment upload fails. (#5700)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (69b9bc4)
            \
             N -- N -- N   refs/heads/thirdeye-temp (abd7de7)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../auto/onboard/AutoOnboardPinotMetadataSource.java       | 10 ++++++----
 .../pinot/thirdeye/auto/onboard/ConfigGenerator.java       |  4 ++--
 .../apache/pinot/thirdeye/datasource/ThirdEyeRequest.java  |  2 +-
 .../datasource/mock/AutoOnboardMockDataSource.java         |  4 +++-
 .../thirdeye/datasource/pinot/PinotThirdEyeDataSource.java |  5 ++---
 .../datasource/pinot/PinotThirdEyeDataSourceConfig.java    | 14 ++++++++------
 .../pinot/PinotThirdeyeDataSourceProperties.java           |  3 ++-
 .../pinot/resources/PinotDataSourceResource.java           |  2 +-
 .../yaml/translator/DetectionConfigTranslator.java         |  2 +-
 .../org/apache/pinot/thirdeye/datalayer/DaoTestUtils.java  |  2 +-
 .../pinot/thirdeye/detection/yaml/YamlResourceTest.java    |  2 +-
 .../yaml/translator/DetectionConfigSlaTranslatorTest.java  |  2 +-
 .../yaml/translator/DetectionConfigTranslatorTest.java     |  2 +-
 13 files changed, 30 insertions(+), 24 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/04: add log conf to Dockerfile launch script

Posted by ap...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch thirdeye-temp
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit be30a9f7ddf9933231bb02eac6e05ee59ee7bfac
Author: Alexander Pucher <al...@alexpucher.com>
AuthorDate: Thu Jul 23 16:25:20 2020 -0700

    add log conf to Dockerfile launch script
---
 docker/images/pinot-thirdeye/bin/start-thirdeye.sh | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/docker/images/pinot-thirdeye/bin/start-thirdeye.sh b/docker/images/pinot-thirdeye/bin/start-thirdeye.sh
index 92b8d87..56498a8 100755
--- a/docker/images/pinot-thirdeye/bin/start-thirdeye.sh
+++ b/docker/images/pinot-thirdeye/bin/start-thirdeye.sh
@@ -39,11 +39,11 @@ fi
 
 echo "Running thirdeye backend config: ${CONFIG_DIR}"
 [ -f "${CONFIG_DIR}/data-sources/data-sources-config-backend.yml" ] && cp "${CONFIG_DIR}/data-sources/data-sources-config-backend.yml" "${CONFIG_DIR}/data-sources/data-sources-config.yml"
-java -cp "./bin/thirdeye-pinot.jar" org.apache.pinot.thirdeye.anomaly.ThirdEyeAnomalyApplication "${CONFIG_DIR}" &
+java -Dlog4j.configurationFile=log4j2.xml -cp "./bin/thirdeye-pinot.jar" org.apache.pinot.thirdeye.anomaly.ThirdEyeAnomalyApplication "${CONFIG_DIR}" &
 sleep 10
 
 echo "Running thirdeye frontend config: ${CONFIG_DIR}"
 [ -f "${CONFIG_DIR}/data-sources/data-sources-config-frontend.yml" ] && cp "${CONFIG_DIR}/data-sources/data-sources-config-frontend.yml" "${CONFIG_DIR}/data-sources/data-sources-config.yml"
-java -cp "./bin/thirdeye-pinot.jar" org.apache.pinot.thirdeye.dashboard.ThirdEyeDashboardApplication "${CONFIG_DIR}" &
+java -Dlog4j.configurationFile=log4j2.xml -cp "./bin/thirdeye-pinot.jar" org.apache.pinot.thirdeye.dashboard.ThirdEyeDashboardApplication "${CONFIG_DIR}" &
 
 wait


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 02/04: external logger settings in docker configs

Posted by ap...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch thirdeye-temp
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 069003987628ca77b42f811213afea798ed83fc9
Author: Alexander Pucher <al...@alexpucher.com>
AuthorDate: Thu Jul 23 17:13:20 2020 -0700

    external logger settings in docker configs
---
 docker/images/pinot-thirdeye/config/ephemeral/dashboard.yml        | 2 ++
 docker/images/pinot-thirdeye/config/ephemeral/detector.yml         | 4 +---
 docker/images/pinot-thirdeye/config/pinot-quickstart/dashboard.yml | 2 ++
 docker/images/pinot-thirdeye/config/pinot-quickstart/detector.yml  | 4 +---
 4 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/docker/images/pinot-thirdeye/config/ephemeral/dashboard.yml b/docker/images/pinot-thirdeye/config/ephemeral/dashboard.yml
index 09c4e86..53d956c 100644
--- a/docker/images/pinot-thirdeye/config/ephemeral/dashboard.yml
+++ b/docker/images/pinot-thirdeye/config/ephemeral/dashboard.yml
@@ -1,3 +1,5 @@
+logging:
+  type: external
 authConfig:
   authEnabled: false
   authKey: ""
diff --git a/docker/images/pinot-thirdeye/config/ephemeral/detector.yml b/docker/images/pinot-thirdeye/config/ephemeral/detector.yml
index da08eb1..9feaa33 100644
--- a/docker/images/pinot-thirdeye/config/ephemeral/detector.yml
+++ b/docker/images/pinot-thirdeye/config/ephemeral/detector.yml
@@ -1,7 +1,5 @@
 logging:
-  level: INFO
-  loggers:
-    org.hibernate.engine.internal: WARN
+  type: external
 server:
   type: default
   rootPath: '/api/*'
diff --git a/docker/images/pinot-thirdeye/config/pinot-quickstart/dashboard.yml b/docker/images/pinot-thirdeye/config/pinot-quickstart/dashboard.yml
index b7b4b47..a31d77c 100644
--- a/docker/images/pinot-thirdeye/config/pinot-quickstart/dashboard.yml
+++ b/docker/images/pinot-thirdeye/config/pinot-quickstart/dashboard.yml
@@ -1,3 +1,5 @@
+logging:
+  type: external
 authConfig:
   authEnabled: false
   authKey: ""
diff --git a/docker/images/pinot-thirdeye/config/pinot-quickstart/detector.yml b/docker/images/pinot-thirdeye/config/pinot-quickstart/detector.yml
index 6bca237..a13b517 100644
--- a/docker/images/pinot-thirdeye/config/pinot-quickstart/detector.yml
+++ b/docker/images/pinot-thirdeye/config/pinot-quickstart/detector.yml
@@ -1,7 +1,5 @@
 logging:
-  level: INFO
-  loggers:
-    org.hibernate.engine.internal: WARN
+  type: external
 server:
   type: default
   rootPath: '/api/*'


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 04/04: [Deepstore by-passing]Introduce a subclasses SplitSegmentCommitter which will proceeds to commit even if the segment upload fails. (#5700)

Posted by ap...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch thirdeye-temp
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit abd7de7cb414bb8899bb4fe1be469ad6b97d8703
Author: Ting Chen <ti...@uber.com>
AuthorDate: Thu Jul 23 17:13:34 2020 -0700

    [Deepstore by-passing]Introduce a subclasses SplitSegmentCommitter which will proceeds to commit even if the segment upload fails. (#5700)
    
    * First commit to enable by passing segment store in LLC.
    
    * Fix a compilation issue.
    
    * Fix an unit test.
    
    * Refactor the segment committer factory api.
    
    * Introduction a new constant for peer download scheme.
    
    * Fix a typo.
    
    * Add a TODO on how to control split commit behavior and refactor the uploadSegment method.
    
    * Remove unused vars.
---
 .../apache/pinot/common/utils/CommonConstants.java |  1 +
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  8 ++--
 .../PinotLLCRealtimeSegmentManagerTest.java        |  3 +-
 .../helix/core/realtime/SegmentCompletionTest.java |  3 +-
 .../realtime/LLRealtimeSegmentDataManager.java     | 24 +++--------
 .../realtime/PeerSchemeSplitSegmentCommitter.java  | 48 ++++++++++++++++++++++
 .../manager/realtime/PinotFSSegmentUploader.java   |  2 +
 .../manager/realtime/SegmentCommitterFactory.java  | 37 +++++++++++++----
 .../manager/realtime/SplitSegmentCommitter.java    | 14 ++++++-
 .../segment/index/loader/IndexLoadingConfig.java   |  7 ++++
 .../realtime/LLRealtimeSegmentDataManagerTest.java |  2 +
 11 files changed, 117 insertions(+), 32 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 7c4a7b2..3e25512 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -343,6 +343,7 @@ public class CommonConstants {
     public static final String FLUSH_THRESHOLD_SIZE = "segment.flush.threshold.size";
     public static final String FLUSH_THRESHOLD_TIME = "segment.flush.threshold.time";
     public static final String PARTITION_METADATA = "segment.partition.metadata";
+    public static final String PEER_SEGMENT_DOWNLOAD_SCHEME = "peer://";
     /**
      * This field is used for parallel push protection to lock the segment globally.
      * We put the segment upload start timestamp so that if the previous push failed without unlock the segment, the
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 1691553..06cc58d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -398,8 +398,9 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   private boolean isPeerSegmentDownloadScheme(CommittingSegmentDescriptor committingSegmentDescriptor) {
-    return !(committingSegmentDescriptor == null) && !(committingSegmentDescriptor.getSegmentLocation() == null) &&
-        committingSegmentDescriptor.getSegmentLocation().toLowerCase().startsWith("peer://");
+    return !(committingSegmentDescriptor == null) && !(committingSegmentDescriptor.getSegmentLocation() == null)
+        && committingSegmentDescriptor.getSegmentLocation().toLowerCase()
+        .startsWith(CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME);
   }
 
   /**
@@ -514,7 +515,8 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   private boolean isPeerURL(String segmentLocation) {
-    return segmentLocation != null && segmentLocation.toLowerCase().startsWith("peer://");
+    return segmentLocation != null && segmentLocation.toLowerCase()
+        .startsWith(CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME);
   }
 
   /**
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index acd4664..e15d28c 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -45,6 +45,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.CommonConstants.Helix;
 import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
@@ -801,7 +802,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     // Test case 2: segment location with peer format: peer://segment1, verify that an empty string is stored in zk.
     committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName();
-    String peerSegmentLocation = "peer:///segment1";
+    String peerSegmentLocation = CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME + "/segment1";
     committingSegmentDescriptor = new CommittingSegmentDescriptor(committingSegment,
         new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(), 0L, peerSegmentLocation);
     committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index 2723063..0166f9f 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -320,7 +320,8 @@ public class SegmentCompletionTest {
   @Test
   public void testHappyPathSplitCommitWithPeerDownloadScheme()
       throws Exception {
-    testHappyPathSplitCommit(5L, "peer:///segment1", "peer:///segment1");
+    testHappyPathSplitCommit(5L, CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME + "/segment1",
+        CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME + "/segment1");
   }
 
   @Test
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 0a8b27d..8944e15 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -846,24 +846,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     }
 
     SegmentCommitter segmentCommitter;
-
-    if (isSplitCommit) {
-      // TODO: make segment uploader used in the segment committer configurable.
-      SegmentUploader segmentUploader;
-      try {
-        segmentUploader =
-            new Server2ControllerSegmentUploader(segmentLogger, _protocolHandler.getFileUploadDownloadClient(),
-                _protocolHandler.getSegmentCommitUploadURL(params, controllerVipUrl), _segmentNameStr,
-                ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), _serverMetrics);
-      } catch (URISyntaxException e) {
-        segmentLogger.error("Segment commit upload url error: ", e);
-        return SegmentCompletionProtocol.RESP_NOT_SENT;
-      }
-      segmentCommitter = _segmentCommitterFactory.createSplitSegmentCommitter(params, segmentUploader);
-    } else {
-      segmentCommitter = _segmentCommitterFactory.createDefaultSegmentCommitter(params);
+    try {
+      segmentCommitter = _segmentCommitterFactory.createSegmentCommitter(isSplitCommit, params, controllerVipUrl);
+    } catch (URISyntaxException e) {
+      segmentLogger.error("Failed to create a segment committer: ", e);
+      return SegmentCompletionProtocol.RESP_NOT_SENT;
     }
-
     return segmentCommitter.commit(_segmentBuildDescriptor);
   }
 
@@ -1268,7 +1256,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       _consumeEndTime = now + minConsumeTimeMillis;
     }
 
-    _segmentCommitterFactory = new SegmentCommitterFactory(segmentLogger, _protocolHandler);
+    _segmentCommitterFactory = new SegmentCommitterFactory(segmentLogger, _protocolHandler, tableConfig, indexLoadingConfig, serverMetrics);
 
     segmentLogger
         .info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", _llcSegmentName,
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PeerSchemeSplitSegmentCommitter.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PeerSchemeSplitSegmentCommitter.java
new file mode 100644
index 0000000..118f0ff
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PeerSchemeSplitSegmentCommitter.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import java.io.File;
+import java.net.URI;
+import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.slf4j.Logger;
+
+
+public class PeerSchemeSplitSegmentCommitter extends SplitSegmentCommitter {
+  public PeerSchemeSplitSegmentCommitter(Logger segmentLogger, ServerSegmentCompletionProtocolHandler protocolHandler,
+      SegmentCompletionProtocol.Request.Params params, SegmentUploader segmentUploader) {
+    super(segmentLogger, protocolHandler, params, segmentUploader);
+  }
+
+  // Always return a uri string even if the segment upload fails and returns a null uri.
+  // If the segment upload fails, put peer:///segment_name in the segment location to notify the controller it is a
+  // peer download scheme.
+  protected String uploadSegment(File segmentTarFile, SegmentUploader segmentUploader,
+      SegmentCompletionProtocol.Request.Params params) {
+    URI segmentLocation = segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(params.getSegmentName()));
+    if (segmentLocation == null) {
+      return StringUtil.join("/", CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME, params.getSegmentName());
+    }
+    return segmentLocation.toString();
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
index 5dae4e4..75a7fff 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
@@ -40,6 +40,8 @@ import org.slf4j.LoggerFactory;
  */
 public class PinotFSSegmentUploader implements SegmentUploader {
   private Logger LOGGER = LoggerFactory.getLogger(PinotFSSegmentUploader.class);
+  public static final int DEFAULT_SEGMENT_UPLOAD_TIMEOUT_MILLIS = 10 * 1000;
+
   private String _segmentStoreUriStr;
   private ExecutorService _executorService = Executors.newCachedThreadPool();
   private int _timeoutInMs;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
index df0f6b8..2d8154e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
@@ -18,8 +18,12 @@
  */
 package org.apache.pinot.core.data.manager.realtime;
 
+import java.net.URISyntaxException;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.slf4j.Logger;
 
 
@@ -29,18 +33,37 @@ import org.slf4j.Logger;
 public class SegmentCommitterFactory {
   private static Logger LOGGER;
   private final ServerSegmentCompletionProtocolHandler _protocolHandler;
+  private final TableConfig _tableConfig;
+  private final ServerMetrics _serverMetrics;
+  private final IndexLoadingConfig _indexLoadingConfig;
 
-  public SegmentCommitterFactory(Logger segmentLogger, ServerSegmentCompletionProtocolHandler protocolHandler) {
+  public SegmentCommitterFactory(Logger segmentLogger, ServerSegmentCompletionProtocolHandler protocolHandler,
+      TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig, ServerMetrics serverMetrics) {
     LOGGER = segmentLogger;
     _protocolHandler = protocolHandler;
+    _tableConfig = tableConfig;
+    _indexLoadingConfig = indexLoadingConfig;
+    _serverMetrics = serverMetrics;
   }
 
-  public SegmentCommitter createSplitSegmentCommitter(SegmentCompletionProtocol.Request.Params params,
-      SegmentUploader segmentUploader) {
-    return new SplitSegmentCommitter(LOGGER, _protocolHandler, params, segmentUploader);
-  }
+  public SegmentCommitter createSegmentCommitter(boolean isSplitCommit, SegmentCompletionProtocol.Request.Params params,
+      String controllerVipUrl)
+      throws URISyntaxException {
+    if (!isSplitCommit) {
+      return new DefaultSegmentCommitter(LOGGER, _protocolHandler, params);
+    }
+    SegmentUploader segmentUploader;
+    // TODO Instead of using a peer segment download scheme to control how the servers do split commit, we should use
+    // other configs such as server or controller configs or controller responses to the servers.
+    if (_tableConfig.getValidationConfig().getPeerSegmentDownloadScheme() != null) {
+      segmentUploader = new PinotFSSegmentUploader(_indexLoadingConfig.getSegmentStoreURI(),
+          PinotFSSegmentUploader.DEFAULT_SEGMENT_UPLOAD_TIMEOUT_MILLIS);
+      return new PeerSchemeSplitSegmentCommitter(LOGGER, _protocolHandler, params, segmentUploader);
+    }
 
-  public SegmentCommitter createDefaultSegmentCommitter(SegmentCompletionProtocol.Request.Params params) {
-    return new DefaultSegmentCommitter(LOGGER, _protocolHandler, params);
+    segmentUploader = new Server2ControllerSegmentUploader(LOGGER, _protocolHandler.getFileUploadDownloadClient(),
+        _protocolHandler.getSegmentCommitUploadURL(params, controllerVipUrl), params.getSegmentName(),
+        ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), _serverMetrics);
+    return new SplitSegmentCommitter(LOGGER, _protocolHandler, params, segmentUploader);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
index 8d73498..33b2ac0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
@@ -56,11 +56,11 @@ public class SplitSegmentCommitter implements SegmentCommitter {
       return SegmentCompletionProtocol.RESP_FAILED;
     }
 
-    URI segmentLocation = _segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(_params.getSegmentName()));
+    String segmentLocation = uploadSegment(segmentTarFile, _segmentUploader, _params);
     if (segmentLocation == null) {
       return SegmentCompletionProtocol.RESP_FAILED;
     }
-    _params.withSegmentLocation(segmentLocation.toString());
+    _params.withSegmentLocation(segmentLocation);
 
     SegmentCompletionProtocol.Response commitEndResponse =
         _protocolHandler.segmentCommitEndWithMetadata(_params, segmentBuildDescriptor.getMetadataFiles());
@@ -71,4 +71,14 @@ public class SplitSegmentCommitter implements SegmentCommitter {
     }
     return commitEndResponse;
   }
+
+  // Return null iff the segment upload fails.
+  protected String uploadSegment(File segmentTarFile, SegmentUploader segmentUploader,
+      SegmentCompletionProtocol.Request.Params params) {
+    URI segmentLocation = segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(params.getSegmentName()));
+    if (segmentLocation != null) {
+      return segmentLocation.toString();
+    }
+    return null;
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
index 6ad0f3f..3c3fb7e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -41,6 +41,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
  */
 public class IndexLoadingConfig {
   private static final int DEFAULT_REALTIME_AVG_MULTI_VALUE_COUNT = 2;
+  private static final String SEGMENT_STORE_URI = "segment.store.uri";
 
   private ReadMode _readMode = ReadMode.DEFAULT_MODE;
   private List<String> _sortedColumns = Collections.emptyList();
@@ -62,6 +63,7 @@ public class IndexLoadingConfig {
   private boolean _isRealtimeOffheapAllocation;
   private boolean _isDirectRealtimeOffheapAllocation;
   private boolean _enableSplitCommitEndWithMetadata;
+  private String _segmentStoreURI;
 
   // constructed from FieldConfig
   private Map<String, Map<String, String>> _columnProperties = new HashMap<>();
@@ -185,6 +187,7 @@ public class IndexLoadingConfig {
       _realtimeAvgMultiValueCount = Integer.valueOf(avgMultiValueCount);
     }
     _enableSplitCommitEndWithMetadata = instanceDataManagerConfig.isEnableSplitCommitEndWithMetadata();
+    _segmentStoreURI = instanceDataManagerConfig.getConfig().getProperty(SEGMENT_STORE_URI);
   }
 
   /**
@@ -332,6 +335,10 @@ public class IndexLoadingConfig {
     return _columnMinMaxValueGeneratorMode;
   }
 
+
+  public String getSegmentStoreURI() { return _segmentStoreURI; }
+
+
   /**
    * For tests only.
    */
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 646ef0a..44af74f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -44,6 +44,7 @@ import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
 import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
 import org.apache.pinot.spi.stream.PermanentConsumerException;
@@ -782,6 +783,7 @@ public class LLRealtimeSegmentDataManagerTest {
       when(dataManagerConfig.getSegmentFormatVersion()).thenReturn(null);
       when(dataManagerConfig.isEnableSplitCommit()).thenReturn(false);
       when(dataManagerConfig.isRealtimeOffHeapAllocation()).thenReturn(false);
+      when(dataManagerConfig.getConfig()).thenReturn(new PinotConfiguration());
       return dataManagerConfig;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 03/04: request log

Posted by ap...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch thirdeye-temp
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 6b579867f50470fbbf438730d314f5125a96b52f
Author: Alexander Pucher <al...@alexpucher.com>
AuthorDate: Thu Jul 23 20:30:41 2020 -0700

    request log
---
 docker/images/pinot-thirdeye/config/ephemeral/dashboard.yml        | 2 ++
 docker/images/pinot-thirdeye/config/ephemeral/detector.yml         | 2 ++
 docker/images/pinot-thirdeye/config/pinot-quickstart/dashboard.yml | 2 ++
 docker/images/pinot-thirdeye/config/pinot-quickstart/detector.yml  | 2 ++
 4 files changed, 8 insertions(+)

diff --git a/docker/images/pinot-thirdeye/config/ephemeral/dashboard.yml b/docker/images/pinot-thirdeye/config/ephemeral/dashboard.yml
index 53d956c..c5298b1 100644
--- a/docker/images/pinot-thirdeye/config/ephemeral/dashboard.yml
+++ b/docker/images/pinot-thirdeye/config/ephemeral/dashboard.yml
@@ -26,6 +26,8 @@ alerterConfiguration:
     smtpHost: localhost
     smtpPort: 25
 server:
+  requestLog:
+    type: external
   type: default
   applicationConnectors:
     - type: http
diff --git a/docker/images/pinot-thirdeye/config/ephemeral/detector.yml b/docker/images/pinot-thirdeye/config/ephemeral/detector.yml
index 9feaa33..a8516ec 100644
--- a/docker/images/pinot-thirdeye/config/ephemeral/detector.yml
+++ b/docker/images/pinot-thirdeye/config/ephemeral/detector.yml
@@ -1,6 +1,8 @@
 logging:
   type: external
 server:
+  requestLog:
+    type: external
   type: default
   rootPath: '/api/*'
   applicationContextPath: /
diff --git a/docker/images/pinot-thirdeye/config/pinot-quickstart/dashboard.yml b/docker/images/pinot-thirdeye/config/pinot-quickstart/dashboard.yml
index a31d77c..b039b8d 100644
--- a/docker/images/pinot-thirdeye/config/pinot-quickstart/dashboard.yml
+++ b/docker/images/pinot-thirdeye/config/pinot-quickstart/dashboard.yml
@@ -26,6 +26,8 @@ alerterConfiguration:
     smtpHost: localhost
     smtpPort: 25
 server:
+  requestLog:
+    type: external
   type: default
   applicationConnectors:
     - type: http
diff --git a/docker/images/pinot-thirdeye/config/pinot-quickstart/detector.yml b/docker/images/pinot-thirdeye/config/pinot-quickstart/detector.yml
index a13b517..1ad3312 100644
--- a/docker/images/pinot-thirdeye/config/pinot-quickstart/detector.yml
+++ b/docker/images/pinot-thirdeye/config/pinot-quickstart/detector.yml
@@ -1,6 +1,8 @@
 logging:
   type: external
 server:
+  requestLog:
+    type: external
   type: default
   rootPath: '/api/*'
   applicationContextPath: /


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org