You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/09/03 15:41:25 UTC
[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4660 Cleanup
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
new 1147553 KYLIN-4660 Cleanup
1147553 is described below
commit 11475536e42bc742757599a1a53738de5e4485dd
Author: XiaoxiangYu <hi...@126.com>
AuthorDate: Thu Sep 3 20:41:01 2020 +0800
KYLIN-4660 Cleanup
---
build/conf/kylin-parquet-log4j.properties | 2 +
build/conf/kylin-server-log4j.properties | 18 +---
.../java/org/apache/kylin/common/QueryContext.java | 2 +-
.../kylin/engine/spark/utils/BuildUtils.scala | 2 +-
.../kylin/engine/spark/utils/Repartitioner.java | 16 ++--
.../kylin/engine/spark/job/TestCubeBuildJob.scala | 6 +-
kylin-spark-project/kylin-spark-query/pom.xml | 7 +-
.../org/apache/spark/sql/SparderContext.scala | 5 +-
kylin-spark-project/kylin-spark-test/pom.xml | 100 ---------------------
pom.xml | 3 +
.../kylin/rest/service/BadQueryDetector.java | 2 +-
.../apache/kylin/rest/service/QueryService.java | 6 +-
webapp/app/partials/admin/admin.html | 6 +-
13 files changed, 32 insertions(+), 143 deletions(-)
diff --git a/build/conf/kylin-parquet-log4j.properties b/build/conf/kylin-parquet-log4j.properties
index bdecb25..36b7dd4 100644
--- a/build/conf/kylin-parquet-log4j.properties
+++ b/build/conf/kylin-parquet-log4j.properties
@@ -23,6 +23,8 @@ log4j.logger.org.apache.kylin=DEBUG
log4j.logger.org.springframework=WARN
log4j.logger.org.springframework.security=WARN
log4j.logger.org.apache.spark=WARN
+# For the purpose of getting Tracking URL
+log4j.logger.org.apache.spark.deploy.yarn=INFO
log4j.logger.org.apache.spark.ContextCleaner=WARN
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
diff --git a/build/conf/kylin-server-log4j.properties b/build/conf/kylin-server-log4j.properties
index 8846a21..33df731 100644
--- a/build/conf/kylin-server-log4j.properties
+++ b/build/conf/kylin-server-log4j.properties
@@ -25,25 +25,9 @@ log4j.appender.file.Append=true
log4j.appender.file.MaxFileSize=268435456
log4j.appender.file.MaxBackupIndex=10
-log4j.appender.realtime=org.apache.log4j.RollingFileAppender
-log4j.appender.realtime.layout=org.apache.log4j.PatternLayout
-log4j.appender.realtime.File=${catalina.home}/../logs/streaming_coordinator.log
-log4j.appender.realtime.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n
-log4j.appender.realtime.Append=true
-log4j.appender.realtime.MaxFileSize=268435456
-log4j.appender.realtime.MaxBackupIndex=10
-
#overall config
log4j.rootLogger=INFO
log4j.logger.org.apache.kylin=DEBUG,file
log4j.logger.org.springframework=WARN,file
log4j.logger.org.springframework.security=INFO,file
-log4j.logger.org.apache.kylin.spark.classloader=INFO,file
-
-log4j.additivity.logger.org.apache.kylin.stream=false
-log4j.logger.org.apache.kylin.stream=TRACE,realtime
-log4j.logger.org.apache.kylin.job=DEBUG,realtime
-log4j.logger.org.apache.kylin.rest.service.StreamingCoordinatorService=DEBUG,realtime
-log4j.logger.org.apache.kylin.rest.service.StreamingV2Service=DEBUG,realtime
-log4j.logger.org.apache.kylin.rest.controller.StreamingCoordinatorController=DEBUG,realtime
-log4j.logger.org.apache.kylin.rest.controller.StreamingV2Controller=DEBUG,realtime
\ No newline at end of file
+log4j.logger.org.apache.kylin.spark.classloader=INFO,file
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index eb01b6e..39c2ae3 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -318,7 +318,7 @@ public class QueryContext {
public void setContextRealization(int ctxId, String realizationName, int realizationType) {
CubeSegmentStatisticsResult cubeSegmentStatisticsResult = cubeSegmentStatisticsResultMap.get(ctxId);
if (cubeSegmentStatisticsResult == null) {
- logger.warn("Cannot find CubeSegmentStatisticsResult for context " + ctxId);
+ logger.debug("Cannot find CubeSegmentStatisticsResult for context " + ctxId);
return;
}
cubeSegmentStatisticsResult.setRealization(realizationName);
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/BuildUtils.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/BuildUtils.scala
index 4d0db98..d729ab7 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/BuildUtils.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/BuildUtils.scala
@@ -60,7 +60,7 @@ object BuildUtils extends Logging {
val repartitioner = new Repartitioner(
config.getParquetStorageShardSizeMB,
config.getParquetStorageRepartitionThresholdSize,
- layout.getRows,
+ layout,
repartitionThresholdSize,
summary,
shardByColumns
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
index e50bd9c..efaa7d0 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.engine.spark.NSparkCubingEngine;
+import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -38,8 +39,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.kylin.engine.spark.NSparkCubingEngine;
-
public class Repartitioner {
private static String tempDirSuffix = "_temp";
protected static final Logger logger = LoggerFactory.getLogger(Repartitioner.class);
@@ -49,14 +48,16 @@ public class Repartitioner {
private int fileLengthThreshold;
private long totalRowCount;
private long rowCountThreshold;
+ private long cuboid;
private ContentSummary contentSummary;
private List<Integer> shardByColumns = new ArrayList<>();
- public Repartitioner(int shardSize, int fileLengthThreshold, long totalRowCount, long rowCountThreshold,
- ContentSummary contentSummary, List<Integer> shardByColumns) {
+ public Repartitioner(int shardSize, int fileLengthThreshold, LayoutEntity layoutEntity, long rowCountThreshold,
+ ContentSummary contentSummary, List<Integer> shardByColumns) {
this.shardSize = shardSize;
this.fileLengthThreshold = fileLengthThreshold;
- this.totalRowCount = totalRowCount;
+ this.totalRowCount = layoutEntity.getRows();
+ cuboid = layoutEntity.getId();
this.rowCountThreshold = rowCountThreshold;
this.contentSummary = contentSummary;
if (shardByColumns != null) {
@@ -118,10 +119,9 @@ public class Repartitioner {
public int getRepartitionNumByStorage() {
int fileLengthRepartitionNum = getFileLengthRepartitionNum();
int rowCountRepartitionNum = getRowCountRepartitionNum();
- logger.info("File length repartition num : {}, Row count Rpartition num: {}", fileLengthRepartitionNum,
- rowCountRepartitionNum);
int partitionSize = (int) Math.ceil(1.0 * (fileLengthRepartitionNum + rowCountRepartitionNum) / 2);
- logger.info("Repartition size is :{}", partitionSize);
+ logger.info("Cuboid[{}] has {} row and {} bytes. Partition count calculated by file size is {}, calculated by row count is {}, final is {}.",
+ cuboid, totalRowCount, contentSummary.getLength(), fileLengthRepartitionNum, rowCountRepartitionNum, partitionSize);
return partitionSize;
}
diff --git a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestCubeBuildJob.scala b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestCubeBuildJob.scala
index 2c70f8b..1f5eb77 100644
--- a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestCubeBuildJob.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestCubeBuildJob.scala
@@ -28,6 +28,7 @@ import org.apache.kylin.common.KylinConfig
import org.apache.kylin.common.util.{DateFormat, HadoopUtil}
import org.apache.kylin.cube.CubeManager
import org.apache.kylin.engine.spark.metadata.MetadataConverter
+import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity
import org.apache.kylin.engine.spark.storage.ParquetStorage
import org.apache.kylin.engine.spark.utils.{BuildUtils, Repartitioner}
import org.apache.kylin.metadata.model.SegmentRange.TSRange
@@ -157,7 +158,10 @@ class TestCubeBuildJob extends WordSpec with MockFactory with SharedSparkSession
val sc = jmock(classOf[ContentSummary])
when(sc.getFileCount).thenReturn(1L)
when(sc.getLength).thenReturn(repartitionNum * 1024 * 1024L)
- val helper = new Repartitioner(1, 1, repartitionNum * 100, 100, sc, isShardByColumn)
+ val layout = new LayoutEntity
+ layout.setRows(repartitionNum * 100)
+ layout.setId(100l)
+ val helper = new Repartitioner(1, 1, layout, 100, sc, isShardByColumn)
Assert.assertEquals(repartitionNum, helper.getRepartitionNumByStorage)
helper
}
diff --git a/kylin-spark-project/kylin-spark-query/pom.xml b/kylin-spark-project/kylin-spark-query/pom.xml
index 97299eb..ba6e2af 100644
--- a/kylin-spark-project/kylin-spark-query/pom.xml
+++ b/kylin-spark-project/kylin-spark-query/pom.xml
@@ -41,12 +41,7 @@
<dependencies>
<dependency>
<groupId>org.apache.kylin</groupId>
- <artifactId>kylin-spark-metadata</artifactId>
- <version>4.0.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin-spark-common</artifactId>
+ <artifactId>kylin-spark-engine</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
index d40e2d1..cc1cef8 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
@@ -36,6 +36,7 @@ import org.apache.kylin.common.KylinConfig
import org.apache.kylin.spark.classloader.ClassLoaderUtils
import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
import org.apache.spark.sql.execution.datasource.KylinSourceStrategy
+import org.apache.spark.utils.YarnInfoFetcherUtils
// scalastyle:off
object SparderContext extends Logging {
@@ -66,7 +67,7 @@ object SparderContext extends Logging {
}
def appMasterTrackURL(): String = {
- if (master_app_url == null)
+ if (master_app_url != null)
master_app_url
else
"Not_initialized"
@@ -168,7 +169,7 @@ object SparderContext extends Logging {
.getContextClassLoader
.toString)
initMonitorEnv()
- master_app_url = null
+ master_app_url = YarnInfoFetcherUtils.getTrackingUrl(appid)
} catch {
case throwable: Throwable =>
logError("Error for initializing spark ", throwable)
diff --git a/kylin-spark-project/kylin-spark-test/pom.xml b/kylin-spark-project/kylin-spark-test/pom.xml
index 6efacdb..398eb62 100644
--- a/kylin-spark-project/kylin-spark-test/pom.xml
+++ b/kylin-spark-project/kylin-spark-test/pom.xml
@@ -30,16 +30,6 @@
<artifactId>kylin-spark-test</artifactId>
<name>Apache Kylin 4.X - Integration Test</name>
- <properties>
- <hdp.version/>
- <fastBuildMode/>
- <kylin.engine/>
- <kylin.storage/>
- <sparder.enabled/>
- <!--<guava.version>20.0</guava.version>-->
- <beanutils.version>1.9.2</beanutils.version>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.apache.kylin</groupId>
@@ -167,94 +157,4 @@
</dependency>
</dependencies>
- <build>
- <pluginManagement>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-failsafe-plugin</artifactId>
- <version>2.16</version>
- <executions>
- <execution>
- <id>integration-tests</id>
- <goals>
- <goal>integration-test</goal>
- </goals>
- </execution>
- <execution>
- <id>verify</id>
- <goals>
- <goal>verify</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <environmentVariables>
- <hdp.version>current</hdp.version>
- </environmentVariables>
- <excludes>
- <exclude>**/*$*</exclude>
- </excludes>
- <reuseForks>false</reuseForks>
- <systemProperties>
- <property>
- <name>log4j.configuration</name>
- <value>
- file:${project.basedir}/../../build/conf/kylin-tools-log4j.properties
- </value>
- </property>
- </systemProperties>
- <argLine>-Xms1G -Xmx4G -XX:PermSize=128M -XX:MaxPermSize=512M -Dhdp.version=current</argLine>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>exec-maven-plugin</artifactId>
- <version>1.5.0</version>
- <executions>
- <execution>
- <id>build_cube_with_engine</id>
- <goals>
- <goal>exec</goal>
- </goals>
- <phase>pre-integration-test</phase>
- <configuration>
- <environmentVariables>
- <hdp.version>current</hdp.version>
- </environmentVariables>
- <skip>${skipTests}</skip>
- <classpathScope>test</classpathScope>
- <executable>java</executable>
- <arguments>
- <argument>-Dhdp.version=${hdp.version}</argument>
- <argument>-DfastBuildMode=${fastBuildMode}</argument>
- <argument>-Dkylin.engine=${kylin.engine}</argument>
- <argument>-Dkylin.storage=${kylin.storage}</argument>
- <argument>-Dsparder.enabled=${sparder.enabled}</argument>
- <argument>
- -Dlog4j.configuration=file:${user.dir}/build/conf/kylin-tools-log4j.properties
- </argument>
- <argument>-classpath</argument>
- <classpath/>
- <argument>org.apache.kylin.provision.BuildCubeWithEngine
- </argument>
- <argument>-P</argument>
- <argument>${profile.id}</argument>
- </arguments>
- <workingDirectory></workingDirectory>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.jacoco</groupId>
- <artifactId>jacoco-maven-plugin</artifactId>
- <configuration>
- <destFile>${sonar.jacoco.itReportPath}</destFile>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
- </build>
-
</project>
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 09b1f98..d388029 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1766,6 +1766,9 @@
</reportsDirectory>
<excludes>
<exclude>**/IT*.java</exclude>
+ <exclude>org.apache.kylin.engine.spark2.NManualBuildAndQueryCuboidTest</exclude>
+ <exclude>org.apache.kylin.engine.spark2.NBuildAndQueryTest</exclude>
+ <exclude>org.apache.kylin.engine.spark2.NBadQueryAndPushDownTest</exclude>
</excludes>
<systemProperties>
<property>
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java b/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
index edabbb5..6cf63f0 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
@@ -145,7 +145,7 @@ public class BadQueryDetector extends Thread {
}
private void detectBadQuery() {
- logger.info("Detect bad query.");
+ logger.debug("Detect bad query.");
long now = System.currentTimeMillis();
ArrayList<Entry> entries = new ArrayList<Entry>(runningQueries.values());
Collections.sort(entries);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 25a4fcf..85f0c72 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -373,9 +373,9 @@ public class QueryService extends BasicService {
stringBuilder.append("Total spark scan time: ").append(response.getTotalSparkScanTime()).append("ms").append(newLine);
stringBuilder.append("Total scan bytes: ").append(response.getTotalScanBytes()).append(newLine);
stringBuilder.append("Result row count: ").append(resultRowCount).append(newLine);
- stringBuilder.append("Accept Partial: ").append(request.isAcceptPartial()).append(newLine);
- stringBuilder.append("Is Partial Result: ").append(response.isPartial()).append(newLine);
- stringBuilder.append("Hit Exception Cache: ").append(response.isHitExceptionCache()).append(newLine);
+// stringBuilder.append("Accept Partial: ").append(request.isAcceptPartial()).append(newLine);
+// stringBuilder.append("Is Partial Result: ").append(response.isPartial()).append(newLine);
+// stringBuilder.append("Hit Exception Cache: ").append(response.isHitExceptionCache()).append(newLine);
stringBuilder.append("Storage cache used: ").append(storageCacheUsed).append(newLine);
stringBuilder.append("Is Query Push-Down: ").append(isPushDown).append(newLine);
stringBuilder.append("Is Prepare: ").append(BackdoorToggles.getPrepareOnly()).append(newLine);
diff --git a/webapp/app/partials/admin/admin.html b/webapp/app/partials/admin/admin.html
index fb5acaf..d684877 100644
--- a/webapp/app/partials/admin/admin.html
+++ b/webapp/app/partials/admin/admin.html
@@ -25,9 +25,9 @@
<tab active="active['tab_instance']" ng-if="isCuratorScheduler()" heading="Instances" select="list()" ng-controller="InstanceCtrl">
<div class="col-xs-12" ng-include src="'partials/admin/instances.html'"></div>
</tab>
- <tab active="active['tab_streaming']" heading="Streaming" select="listReplicaSet()" ng-controller="AdminStreamingCtrl">
- <div class="col-xs-12" ng-include src="'partials/admin/streaming.html'"></div>
- </tab>
+<!-- <tab active="active['tab_streaming']" heading="Streaming" select="listReplicaSet()" ng-controller="AdminStreamingCtrl">-->
+<!-- <div class="col-xs-12" ng-include src="'partials/admin/streaming.html'"></div>-->
+<!-- </tab>-->
<tab active="active['tab_users']" heading="User" select="listUsers()" ng-controller="UserGroupCtrl">
<div class="col-xs-12" ng-include src="'partials/admin/user.html'"></div>
</tab>