You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by ch...@apache.org on 2020/11/08 08:26:41 UTC
[griffin] branch master updated: Change connectors to connector for
datasource
This is an automated email from the ASF dual-hosted git repository.
chitralverma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new 469d47d Change connectors to connector for datasource
469d47d is described below
commit 469d47d589ba3e5e9257c42f9106388f529ba02c
Author: William Guo <gu...@apache.org>
AuthorDate: Sun Nov 8 13:56:32 2020 +0530
Change connectors to connector for datasource
Closes #586 from guoyuepeng/change_connectors_to_connector_for_datasource.
Lead-authored-by: William Guo <gu...@apache.org>
Co-authored-by: deyiyao <de...@ebay.com>
Co-authored-by: ahutsunshine <ah...@gmail.com>
Signed-off-by: Chitral Verma <ch...@gmail.com>
---
measure/src/main/resources/env-batch.json | 6 ++--
.../griffin/core/job/BatchJobOperatorImpl.java | 2 +-
.../org/apache/griffin/core/job/JobInstance.java | 15 ++-------
.../griffin/core/measure/entity/DataSource.java | 36 +++++++---------------
.../metastore/hive/HiveMetaStoreServiceImpl.java | 2 +-
.../org/apache/griffin/core/util/MeasureUtil.java | 23 +++++++-------
service/src/main/resources/env/env_batch.json | 5 ++-
.../griffin/core/measure/repo/MeasureRepoTest.java | 2 +-
.../griffin/core/util/EntityMocksHelper.java | 4 +--
.../app/job/create-job/batch/batch.component.ts | 32 +++++++++----------
.../create-job/streaming/streaming.component.ts | 32 +++++++++----------
.../src/app/job/job-detail/job-detail.component.ts | 2 +-
.../app/measure/create-measure/ac/ac.component.ts | 16 ++++------
.../app/measure/create-measure/pr/pr.component.ts | 7 ++---
.../measure-detail.component.spec.ts | 6 ++--
.../measure-detail/measure-detail.component.ts | 2 +-
ui/angular/src/app/measure/measure.component.ts | 4 +--
17 files changed, 83 insertions(+), 113 deletions(-)
diff --git a/measure/src/main/resources/env-batch.json b/measure/src/main/resources/env-batch.json
index bbec4e5..fd9261d 100644
--- a/measure/src/main/resources/env-batch.json
+++ b/measure/src/main/resources/env-batch.json
@@ -7,14 +7,14 @@
},
"sinks": [
{
- "name": "consoleSink",
+ "name": "console",
"type": "CONSOLE",
"config": {
"max.log.lines": 10
}
},
{
- "name": "hdfsSink",
+ "name": "hdfs",
"type": "HDFS",
"config": {
"path": "hdfs://localhost/griffin/batch/persist",
@@ -23,7 +23,7 @@
}
},
{
- "name": "elasticSink",
+ "name": "elasticsearch",
"type": "ELASTICSEARCH",
"config": {
"method": "post",
diff --git a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
index b46ea8b..9eb60ad 100644
--- a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
@@ -403,7 +403,7 @@ public class BatchJobOperatorImpl implements JobOperator {
Set<String> sets = new HashSet<>();
List<DataSource> sources = measure.getDataSources();
for (DataSource source : sources) {
- source.getConnectors().forEach(dc -> sets.add(dc.getName()));
+ sets.add(source.getConnector().getName());
}
if (sets.size() < sources.size()) {
LOGGER.warn("Connector names cannot be repeated.");
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
index 3899147..344cf75 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
@@ -146,8 +146,7 @@ public class JobInstance implements Job {
jobStartTime = triggerTime.getTime();
}
- private void setSourcesPartitionsAndPredicates(List<DataSource> sources)
- throws Exception {
+ private void setSourcesPartitionsAndPredicates(List<DataSource> sources) {
boolean isFirstBaseline = true;
for (JobDataSegment jds : job.getSegments()) {
if (jds.isAsTsBaseline() && isFirstBaseline) {
@@ -157,22 +156,14 @@ public class JobInstance implements Job {
isFirstBaseline = false;
}
for (DataSource ds : sources) {
- setDataSourcePartitions(jds, ds);
+ setDataConnectorPartitions(jds, ds.getConnector());
}
}
}
- private void setDataSourcePartitions(JobDataSegment jds, DataSource ds)
- throws Exception {
- List<DataConnector> connectors = ds.getConnectors();
- for (DataConnector dc : connectors) {
- setDataConnectorPartitions(jds, dc);
- }
- }
-
private void setDataConnectorPartitions(
JobDataSegment jds,
- DataConnector dc) throws Exception {
+ DataConnector dc) {
String dcName = jds.getDataConnectorName();
if (dcName.equals(dc.getName())) {
Long[] sampleTs = genSampleTs(jds.getSegmentRange(), dc);
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
index 970977b..378b397 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
@@ -28,21 +28,10 @@ import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
-import javax.persistence.CascadeType;
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.FetchType;
-import javax.persistence.JoinColumn;
-import javax.persistence.OneToMany;
-import javax.persistence.PostLoad;
-import javax.persistence.PrePersist;
-import javax.persistence.PreUpdate;
-import javax.persistence.Transient;
+import javax.persistence.*;
import org.apache.griffin.core.util.JsonUtil;
-import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
@Entity
@@ -51,10 +40,10 @@ public class DataSource extends AbstractAuditableEntity {
private String name;
- @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST,
+ @OneToOne(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST,
CascadeType.REMOVE, CascadeType.MERGE})
@JoinColumn(name = "data_source_id")
- private List<DataConnector> connectors = new ArrayList<>();
+ private DataConnector connector = new DataConnector();
private boolean baseline = false;
@@ -75,15 +64,12 @@ public class DataSource extends AbstractAuditableEntity {
this.name = name;
}
- public List<DataConnector> getConnectors() {
- return connectors;
+ public DataConnector getConnector() {
+ return connector;
}
- public void setConnectors(List<DataConnector> connectors) {
- if (CollectionUtils.isEmpty(connectors)) {
- throw new NullPointerException("Data connector can not be empty.");
- }
- this.connectors = connectors;
+ public void setConnector(DataConnector connector) {
+ this.connector = connector;
}
public boolean isBaseline() {
@@ -132,18 +118,18 @@ public class DataSource extends AbstractAuditableEntity {
public DataSource() {
}
- public DataSource(String name, List<DataConnector> connectors) {
+ public DataSource(String name, DataConnector connector) {
this.name = name;
- this.connectors = connectors;
+ this.connector = connector;
}
public DataSource(String name, boolean baseline,
Map<String, Object> checkpointMap,
- List<DataConnector> connectors) {
+ DataConnector connector) {
this.name = name;
this.baseline = baseline;
this.checkpointMap = checkpointMap;
- this.connectors = connectors;
+ this.connector = connector;
}
}
diff --git a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java
index d855183..4b83907 100644
--- a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java
@@ -49,7 +49,7 @@ public class HiveMetaStoreServiceImpl implements HiveMetaStoreService {
private static final Logger LOGGER = LoggerFactory
.getLogger(HiveMetaStoreService.class);
- @Autowired
+ @Autowired(required = false)
private IMetaStoreClient client = null;
@Value("${hive.metastore.dbname}")
diff --git a/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java b/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
index 7e57d87..59526bd 100644
--- a/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
@@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory;
public class MeasureUtil {
private static final Logger LOGGER = LoggerFactory
- .getLogger(MeasureUtil.class);
+ .getLogger(MeasureUtil.class);
public static void validateMeasure(Measure measure) {
if (measure instanceof GriffinMeasure) {
@@ -56,7 +56,7 @@ public class MeasureUtil {
private static void validateGriffinMeasure(GriffinMeasure measure) {
if (getConnectorNamesIfValid(measure) == null) {
throw new GriffinException.BadRequestException
- (INVALID_CONNECTOR_NAME);
+ (INVALID_CONNECTOR_NAME);
}
if (!validatePredicates(measure)) {
throw new GriffinException.BadRequestException(INVALID_MEASURE_PREDICATE);
@@ -65,13 +65,11 @@ public class MeasureUtil {
private static boolean validatePredicates(GriffinMeasure measure) {
for (DataSource dataSource : measure.getDataSources()) {
- for (DataConnector dataConnector : dataSource.getConnectors()) {
- for (SegmentPredicate segmentPredicate : dataConnector.getPredicates()) {
- try {
- PredicatorFactory.newPredicateInstance(segmentPredicate);
- } catch (Exception e) {
- return false;
- }
+ for (SegmentPredicate segmentPredicate : dataSource.getConnector().getPredicates()) {
+ try {
+ PredicatorFactory.newPredicateInstance(segmentPredicate);
+ } catch (Exception e) {
+ return false;
}
}
}
@@ -81,7 +79,7 @@ public class MeasureUtil {
private static void validateExternalMeasure(ExternalMeasure measure) {
if (StringUtils.isBlank(measure.getMetricName())) {
LOGGER.warn("Failed to create external measure {}. " +
- "Its metric name is blank.", measure.getName());
+ "Its metric name is blank.", measure.getName());
throw new GriffinException.BadRequestException(MISSING_METRIC_NAME);
}
}
@@ -90,8 +88,9 @@ public class MeasureUtil {
Set<String> sets = new HashSet<>();
List<DataSource> sources = measure.getDataSources();
for (DataSource source : sources) {
- source.getConnectors().stream().filter(dc -> dc.getName() != null)
- .forEach(dc -> sets.add(dc.getName()));
+ if(source.getConnector() != null && source.getConnector().getName() != null){
+ sets.add(source.getConnector().getName());
+ }
}
if (sets.size() == 0 || sets.size() < sources.size()) {
LOGGER.warn("Connector names cannot be repeated or empty.");
diff --git a/service/src/main/resources/env/env_batch.json b/service/src/main/resources/env/env_batch.json
index 72a3839..9ed9ef7 100644
--- a/service/src/main/resources/env/env_batch.json
+++ b/service/src/main/resources/env/env_batch.json
@@ -4,20 +4,23 @@
},
"sinks": [
{
+ "name": "console",
"type": "CONSOLE",
"config": {
"max.log.lines": 10
}
},
{
+ "name": "hdfs",
"type": "HDFS",
"config": {
- "path": "hdfs:///griffin/persist",
+ "path": "hdfs://localhost/griffin/persist",
"max.persist.lines": 10000,
"max.lines.per.file": 10000
}
},
{
+ "name": "elasticsearch",
"type": "ELASTICSEARCH",
"config": {
"method": "post",
diff --git a/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java b/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java
index 00ce1fa..534527f 100644
--- a/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java
+++ b/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java
@@ -66,7 +66,7 @@ public class MeasureRepoTest {
GriffinMeasure m = (GriffinMeasure) measures.get(0);
List<DataSource> sources = m.getDataSources();
- DataConnector connector = sources.get(0).getConnectors().get(0);
+ DataConnector connector = sources.get(0).getConnector();
Rule rule = m.getEvaluateRule().getRules().get(0);
assertEquals(m.getSinksList().size(), 2);
assertEquals(sources.get(0).isBaseline(), true);
diff --git a/service/src/test/java/org/apache/griffin/core/util/EntityMocksHelper.java b/service/src/test/java/org/apache/griffin/core/util/EntityMocksHelper.java
index 563210d..6d9f053 100644
--- a/service/src/test/java/org/apache/griffin/core/util/EntityMocksHelper.java
+++ b/service/src/test/java/org/apache/griffin/core/util/EntityMocksHelper.java
@@ -82,9 +82,9 @@ public class EntityMocksHelper {
DataConnector dcTarget)
throws Exception {
DataSource dataSource = new DataSource(
- "source", true, createCheckpointMap(), Arrays.asList(dcSource));
+ "source", true, createCheckpointMap(), dcSource);
DataSource targetSource = new DataSource(
- "target", false, createCheckpointMap(), Arrays.asList(dcTarget));
+ "target", false, createCheckpointMap(), dcTarget);
List<DataSource> dataSources = new ArrayList<>();
dataSources.add(dataSource);
dataSources.add(targetSource);
diff --git a/ui/angular/src/app/job/create-job/batch/batch.component.ts b/ui/angular/src/app/job/create-job/batch/batch.component.ts
index 150f307..9049761 100644
--- a/ui/angular/src/app/job/create-job/batch/batch.component.ts
+++ b/ui/angular/src/app/job/create-job/batch/batch.component.ts
@@ -268,23 +268,21 @@ export class BatchComponent implements OnInit, AfterViewChecked {
if (measure == map.name) {
var source = map["data.sources"];
for (let i = 0; i < source.length; i++) {
- var details = source[i].connectors;
- for (let j = 0; j < details.length; j++) {
- if (details[j]["data.unit"] != undefined) {
- var table =
- details[j].config.database +
- "." +
- details[j].config["table.name"];
- var size = details[j]["data.unit"];
- var connectorname = details[j]["name"];
- var detail = {
- id: i + 1,
- name: table,
- size: size,
- connectorname: connectorname
- };
- this.dropdownList.push(detail);
- }
+ var connector = source[i].connector;
+ if (connector["data.unit"] != undefined) {
+ var table =
+ connector.config.database +
+ "." +
+ connector.config["table.name"];
+ var size = connector["data.unit"];
+ var connectorname = connector["name"];
+ var detail = {
+ id: i + 1,
+ name: table,
+ size: size,
+ connectorname: connectorname
+ };
+ this.dropdownList.push(detail);
}
}
}
diff --git a/ui/angular/src/app/job/create-job/streaming/streaming.component.ts b/ui/angular/src/app/job/create-job/streaming/streaming.component.ts
index 200d788..808631b 100644
--- a/ui/angular/src/app/job/create-job/streaming/streaming.component.ts
+++ b/ui/angular/src/app/job/create-job/streaming/streaming.component.ts
@@ -234,23 +234,21 @@ export class StreamingComponent implements OnInit {
if (measure == map.name) {
var source = map["data.sources"];
for (let i = 0; i < source.length; i++) {
- var details = source[i].connectors;
- for (let j = 0; j < details.length; j++) {
- if (details[j]["data.unit"] != undefined) {
- var table =
- details[j].config.database +
- "." +
- details[j].config["table.name"];
- var size = details[j]["data.unit"];
- var connectorname = details[j]["name"];
- var detail = {
- id: i + 1,
- name: table,
- size: size,
- connectorname: connectorname
- };
- this.dropdownList.push(detail);
- }
+ var connector = source[i].connector;
+ if (connector["data.unit"] != undefined) {
+ var table =
+ connector.config.database +
+ "." +
+ connector.config["table.name"];
+ var size = connector["data.unit"];
+ var connectorname = connector["name"];
+ var detail = {
+ id: i + 1,
+ name: table,
+ size: size,
+ connectorname: connectorname
+ };
+ this.dropdownList.push(detail);
}
}
}
diff --git a/ui/angular/src/app/job/job-detail/job-detail.component.ts b/ui/angular/src/app/job/job-detail/job-detail.component.ts
index 7c4a10b..d857b26 100644
--- a/ui/angular/src/app/job/job-detail/job-detail.component.ts
+++ b/ui/angular/src/app/job/job-detail/job-detail.component.ts
@@ -57,7 +57,7 @@ export class JobDetailComponent implements OnInit {
this.measureType = this.measureData["dq.type"].toLowerCase();
this.processType = this.measureData["process.type"].toLowerCase();
for (let item of this.measureData["data.sources"]) {
- let config = item.connectors[0].config;
+ let config = item.connector.config;
let tableName = config.database + "." + config["table.name"];
this.tableInfo.push(tableName);
}
diff --git a/ui/angular/src/app/measure/create-measure/ac/ac.component.ts b/ui/angular/src/app/measure/create-measure/ac/ac.component.ts
index 72d0138..8b7bbec 100644
--- a/ui/angular/src/app/measure/create-measure/ac/ac.component.ts
+++ b/ui/angular/src/app/measure/create-measure/ac/ac.component.ts
@@ -148,7 +148,7 @@ export class AcComponent implements OnInit, AfterViewChecked {
"data.sources": [
{
name: "source",
- connectors: [
+ connector:
{
name: "",
type: "HIVE",
@@ -170,11 +170,10 @@ export class AcComponent implements OnInit, AfterViewChecked {
}
]
}
- ]
},
{
name: "target",
- connectors: [
+ connector:
{
name: "",
type: "HIVE",
@@ -196,7 +195,6 @@ export class AcComponent implements OnInit, AfterViewChecked {
}
]
}
- ]
}
],
@@ -393,7 +391,7 @@ export class AcComponent implements OnInit, AfterViewChecked {
"data.sources": [
{
name: "source",
- connectors: [
+ connector:
{
name: this.src_name,
type: "HIVE",
@@ -415,11 +413,10 @@ export class AcComponent implements OnInit, AfterViewChecked {
}
]
}
- ]
},
{
name: "target",
- connectors: [
+ connector:
{
name: this.tgt_name,
type: "HIVE",
@@ -441,7 +438,6 @@ export class AcComponent implements OnInit, AfterViewChecked {
}
]
}
- ]
}
],
"evaluate.rule": {
@@ -499,11 +495,11 @@ export class AcComponent implements OnInit, AfterViewChecked {
}
deleteUnit(index) {
- delete this.newMeasure["data.sources"][index]["connectors"][0]["data.unit"];
+ delete this.newMeasure["data.sources"][index]["connector"]["data.unit"];
}
deletePredicates(index) {
- delete this.newMeasure["data.sources"][index]["connectors"][0]["predicates"];
+ delete this.newMeasure["data.sources"][index]["connector"]["predicates"];
}
save() {
diff --git a/ui/angular/src/app/measure/create-measure/pr/pr.component.ts b/ui/angular/src/app/measure/create-measure/pr/pr.component.ts
index a1dae5e..57be58f 100644
--- a/ui/angular/src/app/measure/create-measure/pr/pr.component.ts
+++ b/ui/angular/src/app/measure/create-measure/pr/pr.component.ts
@@ -360,7 +360,7 @@ export class PrComponent implements AfterViewChecked, OnInit {
"data.sources": [
{
name: "source",
- connectors: [
+ connector:
{
name: this.step1.srcname,
type: "HIVE",
@@ -382,7 +382,6 @@ export class PrComponent implements AfterViewChecked, OnInit {
}
]
}
- ]
}
],
"evaluate.rule": {
@@ -393,10 +392,10 @@ export class PrComponent implements AfterViewChecked, OnInit {
this.getGrouprule();
if (this.step3.size.indexOf("0") == 0) {
- delete this.newMeasure["data.sources"][0]["connectors"][0]["data.unit"];
+ delete this.newMeasure["data.sources"][0]["connector"]["data.unit"];
}
if (!this.step3.needpath || this.step3.path == "") {
- delete this.newMeasure["data.sources"][0]["connectors"][0]["predicates"];
+ delete this.newMeasure["data.sources"][0]["connector"]["predicates"];
}
this.visible = true;
setTimeout(() => (this.visibleAnimate = true), 100);
diff --git a/ui/angular/src/app/measure/measure-detail/measure-detail.component.spec.ts b/ui/angular/src/app/measure/measure-detail/measure-detail.component.spec.ts
index 6fd2f25..08d9dc4 100644
--- a/ui/angular/src/app/measure/measure-detail/measure-detail.component.spec.ts
+++ b/ui/angular/src/app/measure/measure-detail/measure-detail.component.spec.ts
@@ -43,7 +43,7 @@ describe('MeasureDetailComponent', () => {
});
it(
- 'should be created',
+ 'should be created',
inject(
[HttpTestingController, ServiceService],
(httpMock: HttpTestingController, serviceService: ServiceService) => {
@@ -54,13 +54,13 @@ describe('MeasureDetailComponent', () => {
"dq.type": "",
"evaluate.rule": "",
"data.sources": [{
- "connectors": [{
+ "connector": {
"data.unit": "",
config: {
where: ""
},
predicates: []
- }]
+ }
}]
});
diff --git a/ui/angular/src/app/measure/measure-detail/measure-detail.component.ts b/ui/angular/src/app/measure/measure-detail/measure-detail.component.ts
index 5ad5fae..7413af1 100644
--- a/ui/angular/src/app/measure/measure-detail/measure-detail.component.ts
+++ b/ui/angular/src/app/measure/measure-detail/measure-detail.component.ts
@@ -64,7 +64,7 @@ export class MeasureDetailComponent implements OnInit {
currentrule: string;
fetchData(value, index) {
- var data = this.ruleData["data.sources"][index].connectors[0];
+ var data = this.ruleData["data.sources"][index].connector;
var size = value + "size";
var zone = value + "zone";
var where = value + "where";
diff --git a/ui/angular/src/app/measure/measure.component.ts b/ui/angular/src/app/measure/measure.component.ts
index 23f7a15..d94dc3c 100644
--- a/ui/angular/src/app/measure/measure.component.ts
+++ b/ui/angular/src/app/measure/measure.component.ts
@@ -72,11 +72,11 @@ export class MeasureComponent implements OnInit {
this.deletedRow = row;
$("#save").removeAttr("disabled");
if (this.deletedRow["measure.type"] !== "external") {
- var sourcedata = this.deletedRow["data.sources"][0].connectors[0].config;
+ var sourcedata = this.deletedRow["data.sources"][0].connector.config;
this.sourceTable = sourcedata["table.name"];
}
if (this.deletedRow["dq.type"] === "accuracy") {
- var targetdata = this.deletedRow["data.sources"][1].connectors[0].config;
+ var targetdata = this.deletedRow["data.sources"][1].connector.config;
this.targetTable = targetdata["table.name"];
} else {
this.targetTable = "";