You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2022/08/12 21:21:04 UTC
[incubator-streampipes] 01/01: [STREAMPIPES-572] Fix automatic lowercase when persisting an adapter
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch STREAMPIPES-572
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit dc18be6fbbe1dce22224db7eb5ed2744aabf873b
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Fri Aug 12 23:20:41 2022 +0200
[STREAMPIPES-572] Fix automatic lowercase when persisting an adapter
---
.../dataexplorer/commons/DataExplorerUtils.java | 5 +----
.../commons/influx/InfluxNameSanitizer.java | 10 +--------
.../dataexplorer/commons/influx/InfluxStore.java | 26 ++++++++++++----------
.../start-adapter-configuration.component.ts | 9 ++------
.../adapter-started-dialog.component.ts | 15 +------------
5 files changed, 19 insertions(+), 46 deletions(-)
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
index 12e53e2b0..4fde78355 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
@@ -54,13 +54,10 @@ public class DataExplorerUtils {
// Removes selected timestamp from event schema
removeTimestampsFromEventSchema(measure);
- // Escapes all spaces with _
- measure.setMeasureName(InfluxNameSanitizer.prepareString(measure.getMeasureName()));
-
// Removes all spaces with _ and validates that no special terms are used as runtime names
measure.getEventSchema()
.getEventProperties()
- .forEach(ep -> ep.setRuntimeName(InfluxNameSanitizer.sanitizePropertyRuntimeName(ep.getRuntimeName())));
+ .forEach(ep -> ep.setRuntimeName(InfluxNameSanitizer.renameReservedKeywords(ep.getRuntimeName())));
}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxNameSanitizer.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxNameSanitizer.java
index df7860a75..302ea4586 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxNameSanitizer.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxNameSanitizer.java
@@ -20,11 +20,7 @@ package org.apache.streampipes.dataexplorer.commons.influx;
public class InfluxNameSanitizer {
- public static String prepareString(String s) {
- return s.replaceAll(" ", "_");
- }
-
- private static String renameReservedKeywords(String runtimeName) {
+ public static String renameReservedKeywords(String runtimeName) {
if (InfluxDbReservedKeywords.keywordList.stream().anyMatch(k -> k.equalsIgnoreCase(runtimeName))) {
return runtimeName + "_";
} else {
@@ -32,8 +28,4 @@ public class InfluxNameSanitizer {
}
}
- public static String sanitizePropertyRuntimeName(String runtimeName) {
- String sanitizedRuntimeName = prepareString(runtimeName);
- return renameReservedKeywords(sanitizedRuntimeName);
- }
}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
index 8724b8a15..cb429fee8 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
@@ -28,7 +28,6 @@ import org.apache.streampipes.svcdiscovery.api.SpConfig;
import org.apache.streampipes.vocabulary.XSD;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
-
import org.influxdb.dto.Point;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
@@ -45,8 +44,6 @@ public class InfluxStore {
private static final Logger LOG = LoggerFactory.getLogger(InfluxStore.class);
- private final static Integer batchSize = 2000;
- private final static Integer flushDuration = 500;
private InfluxDB influxDb = null;
DataLakeMeasure measure;
@@ -62,7 +59,7 @@ public class InfluxStore {
measure.getEventSchema()
.getEventProperties()
.forEach(ep -> targetRuntimeNames.put(ep.getRuntimeName(),
- InfluxNameSanitizer.sanitizePropertyRuntimeName(ep.getRuntimeName())));
+ InfluxNameSanitizer.renameReservedKeywords(ep.getRuntimeName())));
connect(settings);
}
@@ -71,7 +68,7 @@ public class InfluxStore {
* Connects to the InfluxDB Server, sets the database and initializes the batch-behaviour
*
* @throws SpRuntimeException If not connection can be established or if the database could not
- * be found
+ * be found
*/
private void connect(InfluxConnectionSettings settings) throws SpRuntimeException {
// Connecting to the server
@@ -87,20 +84,22 @@ public class InfluxStore {
String databaseName = settings.getDatabaseName();
// Checking whether the database exists
- if(!databaseExists(databaseName)) {
+ if (!databaseExists(databaseName)) {
LOG.info("Database '" + databaseName + "' not found. Gets created ...");
createDatabase(databaseName);
}
// setting up the database
influxDb.setDatabase(databaseName);
+ int batchSize = 2000;
+ int flushDuration = 500;
influxDb.enableBatch(batchSize, flushDuration, TimeUnit.MILLISECONDS);
}
private boolean databaseExists(String dbName) {
QueryResult queryResult = influxDb.query(new Query("SHOW DATABASES", ""));
- for(List<Object> a : queryResult.getResults().get(0).getSeries().get(0).getValues()) {
- if(a.get(0).equals(dbName)) {
+ for (List<Object> a : queryResult.getResults().get(0).getSeries().get(0).getValues()) {
+ if (a.get(0).equals(dbName)) {
return true;
}
}
@@ -113,8 +112,9 @@ public class InfluxStore {
* @param dbName The name of the database which should be created
*/
private void createDatabase(String dbName) throws SpRuntimeException {
- if(!dbName.matches("^[a-zA-Z_]\\w*$")) {
- throw new SpRuntimeException("Database name '" + dbName + "' not allowed. Allowed names: ^[a-zA-Z_][a-zA-Z0-9_]*$");
+ if (!dbName.matches("^[a-zA-Z_]\\w*$")) {
+ throw new SpRuntimeException(
+ "Database name '" + dbName + "' not allowed. Allowed names: ^[a-zA-Z_][a-zA-Z0-9_]*$");
}
influxDb.query(new Query("CREATE DATABASE \"" + dbName + "\"", ""));
}
@@ -131,7 +131,8 @@ public class InfluxStore {
}
Long timestampValue = event.getFieldBySelector(measure.getTimestampField()).getAsPrimitive().getAsLong();
- Point.Builder p = Point.measurement(measure.getMeasureName()).time((long) timestampValue, TimeUnit.MILLISECONDS);
+ Point.Builder p =
+ Point.measurement(measure.getMeasureName()).time((long) timestampValue, TimeUnit.MILLISECONDS);
for (EventProperty ep : measure.getEventSchema().getEventProperties()) {
if (ep instanceof EventPropertyPrimitive) {
@@ -140,7 +141,8 @@ public class InfluxStore {
// timestamp should not be added as a field
if (!measure.getTimestampField().endsWith(runtimeName)) {
String preparedRuntimeName = targetRuntimeNames.get(runtimeName);
- PrimitiveField eventPropertyPrimitiveField = event.getFieldByRuntimeName(runtimeName).getAsPrimitive();
+ PrimitiveField eventPropertyPrimitiveField =
+ event.getFieldByRuntimeName(runtimeName).getAsPrimitive();
// store property as tag when the field is a dimension property
if ("DIMENSION_PROPERTY".equals(ep.getPropertyScope())) {
diff --git a/ui/src/app/connect/components/new-adapter/start-adapter-configuration/start-adapter-configuration.component.ts b/ui/src/app/connect/components/new-adapter/start-adapter-configuration/start-adapter-configuration.component.ts
index 3b116cee4..a498ca473 100644
--- a/ui/src/app/connect/components/new-adapter/start-adapter-configuration/start-adapter-configuration.component.ts
+++ b/ui/src/app/connect/components/new-adapter/start-adapter-configuration/start-adapter-configuration.component.ts
@@ -22,18 +22,14 @@ import {
EventProperty,
EventRateTransformationRuleDescription,
EventSchema,
- GenericAdapterSetDescription,
RemoveDuplicatesTransformationRuleDescription,
- SpecificAdapterSetDescription
} from '@streampipes/platform-services';
import { FormBuilder, FormControl, FormGroup, Validators } from '@angular/forms';
import { MatStepper } from '@angular/material/stepper';
import { AdapterStartedDialog } from '../../../dialog/adapter-started/adapter-started-dialog.component';
import { PanelType, DialogService } from '@streampipes/shared-ui';
import { ShepherdService } from '../../../../services/tour/shepherd.service';
-import { ConnectService } from '../../../services/connect.service';
import { TimestampPipe } from '../../../filter/timestamp.pipe';
-import { MatCheckboxChange } from '@angular/material/checkbox';
@Component({
selector: 'sp-start-adapter-configuration',
@@ -90,7 +86,6 @@ export class StartAdapterConfigurationComponent implements OnInit {
constructor(
private dialogService: DialogService,
private shepherdService: ShepherdService,
- private connectService: ConnectService,
private _formBuilder: FormBuilder,
private timestampPipe: TimestampPipe) {
}
@@ -100,7 +95,7 @@ export class StartAdapterConfigurationComponent implements OnInit {
this.startAdapterForm = this._formBuilder.group({});
this.startAdapterForm.addControl('adapterName', new FormControl(this.adapterDescription.name, Validators.required));
this.startAdapterForm.valueChanges.subscribe(v => this.adapterDescription.name = v.adapterName);
- this.startAdapterForm.statusChanges.subscribe((status) => {
+ this.startAdapterForm.statusChanges.subscribe(() => {
this.startAdapterSettingsFormValid = this.startAdapterForm.valid;
});
}
@@ -144,7 +139,7 @@ export class StartAdapterConfigurationComponent implements OnInit {
this.shepherdService.trigger('button-startAdapter');
- dialogRef.afterClosed().subscribe(result => {
+ dialogRef.afterClosed().subscribe(() => {
this.adapterStartedEmitter.emit();
});
}
diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts
index e4c3e67fe..a6aa9a01f 100644
--- a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts
+++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts
@@ -42,7 +42,7 @@ export class AdapterStartedDialog implements OnInit {
adapterInstalled = false;
public adapterStatus: Message;
public streamDescription: SpDataStream;
- private pollingActive = false;
+ pollingActive = false;
public runtimeData: any;
public isSetAdapter = false;
public isTemplate = false;
@@ -92,14 +92,6 @@ export class AdapterStartedDialog implements OnInit {
const pipelineName = 'Persist ' + this.adapter.name;
let indexName = this.adapter.name
- .toLowerCase()
- .replace(/ /g, '')
- .replace(/\./g, '');
-
- // Ensure that index name is no number
- if (this.isNumber(indexName)) {
- indexName = 'sp' + indexName;
- }
const pipelineInvocation = PipelineInvocationBuilder
.create(res)
@@ -127,9 +119,4 @@ export class AdapterStartedDialog implements OnInit {
this.shepherdService.trigger('confirm_adapter_started_button');
}
- private isNumber(value: string | number): boolean {
- return ((value != null) &&
- (value !== '') &&
- !isNaN(Number(value.toString())));
- }
}