You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/02/08 07:49:57 UTC
[shardingsphere] branch master updated: Add YamlJobProgress (#9386)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7f72a69 Add YamlJobProgress (#9386)
7f72a69 is described below
commit 7f72a69831c950a2e1476216c75a6f3e95fd8466
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Mon Feb 8 15:49:22 2021 +0800
Add YamlJobProgress (#9386)
* Optimize ScalingJobPreparer
* Add YamlJobProgress
Co-authored-by: qiulu3 <Lucas209910>
---
.../core/api/impl/RegistryRepositoryAPIImpl.java | 14 +--
.../core/job/position/PlaceholderPosition.java | 28 +----
.../core/job/position/PositionInitializer.java | 12 +-
.../job/position/PositionInitializerFactory.java | 13 +-
.../core/job/position/PrimaryKeyPosition.java | 43 +++----
.../core/job/preparer/ScalingJobPreparer.java | 4 +-
.../scaling/core/job/progress/JobProgress.java | 137 +++------------------
.../job/progress/yaml/JobProgressYamlSwapper.java | 123 ++++++++++++++++++
.../core/job/progress/yaml/YamlJobProgress.java} | 45 ++++---
.../scaling/core/spi/ScalingEntry.java | 2 +-
.../scaling/core/util/ReflectionUtil.java | 15 ---
.../core/fixture/FixtureH2ScalingEntry.java | 2 +-
.../core/fixture/FixturePositionInitializer.java | 7 +-
.../core/job/position/PlaceholderPositionTest.java | 8 +-
.../core/job/position/PrimaryKeyPositionTest.java | 10 +-
.../scaling/core/job/progress/JobProgressTest.java | 103 ++++++++++++++++
.../src/test/resources/logback-test.xml | 3 +-
.../scaling/mysql/MySQLScalingEntry.java | 4 +-
.../scaling/mysql/binlog/BinlogPosition.java | 9 +-
.../scaling/mysql/component/MySQLBinlogDumper.java | 6 +-
.../mysql/component/MySQLPositionInitializer.java | 10 +-
.../scaling/mysql/binlog/BinlogPositionTest.java | 6 +-
.../scaling/postgresql/PostgreSQLScalingEntry.java | 2 +-
.../component/PostgreSQLPositionInitializer.java | 7 +-
.../scaling/postgresql/wal/WalPosition.java | 11 +-
.../scaling/postgresql/wal/WalPositionTest.java | 15 +--
26 files changed, 363 insertions(+), 276 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
index 06dbbd5..4852f33 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
@@ -46,12 +46,12 @@ public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
@Override
public void persistJobProgress(final JobContext jobContext) {
- JobProgress jobPosition = new JobProgress();
- jobPosition.setStatus(jobContext.getStatus());
- jobPosition.setDatabaseType(jobContext.getJobConfig().getHandleConfig().getDatabaseType());
- jobPosition.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(jobContext));
- jobPosition.setInventoryTaskProgressMap(getInventoryTaskProgressMap(jobContext));
- registryRepository.persist(getOffsetPath(jobContext.getJobId(), jobContext.getShardingItem()), jobPosition.toJson());
+ JobProgress jobProgress = new JobProgress();
+ jobProgress.setStatus(jobContext.getStatus());
+ jobProgress.setDatabaseType(jobContext.getJobConfig().getHandleConfig().getDatabaseType());
+ jobProgress.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(jobContext));
+ jobProgress.setInventoryTaskProgressMap(getInventoryTaskProgressMap(jobContext));
+ registryRepository.persist(getOffsetPath(jobContext.getJobId(), jobContext.getShardingItem()), jobProgress.toString());
}
private Map<String, IncrementalTaskProgress> getIncrementalTaskProgressMap(final JobContext jobContext) {
@@ -78,7 +78,7 @@ public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
} catch (final NullPointerException ex) {
log.info("job {}-{} without break point.", jobId, shardingItem);
}
- return Strings.isNullOrEmpty(data) ? null : JobProgress.fromJson(data);
+ return Strings.isNullOrEmpty(data) ? null : JobProgress.init(data);
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPosition.java
index 67abb5a..f79c5e1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPosition.java
@@ -17,17 +17,9 @@
package org.apache.shardingsphere.scaling.core.job.position;
-import com.google.gson.TypeAdapter;
-import com.google.gson.annotations.JsonAdapter;
-import com.google.gson.stream.JsonReader;
-import com.google.gson.stream.JsonWriter;
-
-import java.io.IOException;
-
/**
* Placeholder position.
*/
-@JsonAdapter(PlaceholderPosition.PositionTypeAdapter.class)
public final class PlaceholderPosition implements Position<PlaceholderPosition> {
@Override
@@ -35,22 +27,8 @@ public final class PlaceholderPosition implements Position<PlaceholderPosition>
return 1;
}
- /**
- * Position type adapter.
- */
- public static class PositionTypeAdapter extends TypeAdapter<PlaceholderPosition> {
-
- @Override
- public void write(final JsonWriter out, final PlaceholderPosition value) throws IOException {
- out.beginArray();
- out.endArray();
- }
-
- @Override
- public PlaceholderPosition read(final JsonReader in) throws IOException {
- in.beginArray();
- in.endArray();
- return new PlaceholderPosition();
- }
+ @Override
+ public String toString() {
+ return "";
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java
index 4727d32..ab3bfbb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java
@@ -23,7 +23,7 @@ import java.sql.SQLException;
/**
* Position initializer.
*/
-public interface PositionInitializer<T extends Position<?>> {
+public interface PositionInitializer {
/**
* Init position by data source.
@@ -32,5 +32,13 @@ public interface PositionInitializer<T extends Position<?>> {
* @return position
* @throws SQLException SQL exception
*/
- T init(DataSource dataSource) throws SQLException;
+ Position<?> init(DataSource dataSource) throws SQLException;
+
+ /**
+ * Init position by string data.
+ *
+ * @param data string data
+ * @return position
+ */
+ Position<?> init(String data);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializerFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializerFactory.java
index c6d7617..0bbf66f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializerFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializerFactory.java
@@ -21,7 +21,6 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
-import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
/**
* Position initializer factory.
@@ -36,17 +35,7 @@ public final class PositionInitializerFactory {
* @return position initializer
*/
@SneakyThrows(ReflectiveOperationException.class)
- public static PositionInitializer<?> newInstance(final String databaseType) {
+ public static PositionInitializer newInstance(final String databaseType) {
return ScalingEntryLoader.getInstance(databaseType).getPositionInitializer().newInstance();
}
-
- /**
- * Get position type.
- *
- * @param databaseType database type
- * @return position type
- */
- public static Class<?> getPositionClass(final String databaseType) {
- return ReflectionUtil.getInterfaceGenericClass(ScalingEntryLoader.getInstance(databaseType).getPositionInitializer());
- }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
index 39e1d95..f7325aa 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
@@ -17,27 +17,33 @@
package org.apache.shardingsphere.scaling.core.job.position;
-import com.google.gson.TypeAdapter;
-import com.google.gson.annotations.JsonAdapter;
-import com.google.gson.stream.JsonReader;
-import com.google.gson.stream.JsonWriter;
+import com.google.common.base.Preconditions;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import java.io.IOException;
-
/**
* Use primary key as position.
*/
@RequiredArgsConstructor
@Getter
-@JsonAdapter(PrimaryKeyPosition.PositionTypeAdapter.class)
public final class PrimaryKeyPosition implements Position<PrimaryKeyPosition> {
private final long beginValue;
private final long endValue;
+ /**
+ * Init by string data.
+ *
+ * @param data string data
+ * @return primary key position
+ */
+ public static PrimaryKeyPosition init(final String data) {
+ String[] array = data.split(",");
+ Preconditions.checkArgument(array.length == 2, "Unknown primary key position: " + data);
+ return new PrimaryKeyPosition(Long.parseLong(array[0]), Long.parseLong(array[1]));
+ }
+
@Override
public int compareTo(final PrimaryKeyPosition position) {
if (null == position) {
@@ -46,25 +52,8 @@ public final class PrimaryKeyPosition implements Position<PrimaryKeyPosition> {
return Long.compare(beginValue, position.beginValue);
}
- /**
- * Position type adapter.
- */
- public static class PositionTypeAdapter extends TypeAdapter<PrimaryKeyPosition> {
-
- @Override
- public void write(final JsonWriter out, final PrimaryKeyPosition value) throws IOException {
- out.beginArray();
- out.value(value.getBeginValue());
- out.value(value.getEndValue());
- out.endArray();
- }
-
- @Override
- public PrimaryKeyPosition read(final JsonReader in) throws IOException {
- in.beginArray();
- PrimaryKeyPosition result = new PrimaryKeyPosition(in.nextLong(), in.nextLong());
- in.endArray();
- return result;
- }
+ @Override
+ public String toString() {
+ return String.format("%d,%d", beginValue, endValue);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
index 11ca2c8..d258161 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
@@ -61,7 +61,9 @@ public final class ScalingJobPreparer {
private void checkDataSource(final JobContext jobContext, final DataSourceManager dataSourceManager) {
checkSourceDataSources(jobContext, dataSourceManager);
- checkTargetDataSources(jobContext, dataSourceManager);
+ if (null == jobContext.getInitProgress()) {
+ checkTargetDataSources(jobContext, dataSourceManager);
+ }
}
private void checkSourceDataSources(final JobContext jobContext, final DataSourceManager dataSourceManager) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgress.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgress.java
index 776e73f..32eabbe 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgress.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgress.java
@@ -17,28 +17,16 @@
package org.apache.shardingsphere.scaling.core.job.progress;
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.TypeAdapter;
-import com.google.gson.stream.JsonReader;
-import com.google.gson.stream.JsonWriter;
import lombok.Getter;
import lombok.Setter;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.scaling.core.job.JobStatus;
-import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
import org.apache.shardingsphere.scaling.core.job.position.Position;
-import org.apache.shardingsphere.scaling.core.job.position.PositionInitializerFactory;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskDelay;
+import org.apache.shardingsphere.scaling.core.job.progress.yaml.JobProgressYamlSwapper;
+import org.apache.shardingsphere.scaling.core.job.progress.yaml.YamlJobProgress;
import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
-import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Pattern;
@@ -51,9 +39,7 @@ import java.util.stream.Collectors;
@Setter
public final class JobProgress {
- private static final Gson GSON = new Gson();
-
- private static final Gson INVENTORY_POSITION_ADAPTED_GSON = new GsonBuilder().registerTypeHierarchyAdapter(Position.class, new InventoryPositionTypeAdapter()).create();
+ private static final JobProgressYamlSwapper JOB_PROGRESS_YAML_SWAPPER = new JobProgressYamlSwapper();
private JobStatus status = JobStatus.RUNNING;
@@ -63,6 +49,17 @@ public final class JobProgress {
private Map<String, IncrementalTaskProgress> incrementalTaskProgressMap;
+
+ /**
+ * Init by string data.
+ *
+ * @param data string data
+ * @return job progress
+ */
+ public static JobProgress init(final String data) {
+ return JOB_PROGRESS_YAML_SWAPPER.swapToObject(YamlEngine.unmarshal(data, YamlJobProgress.class));
+ }
+
/**
* Get incremental position.
*
@@ -86,106 +83,8 @@ public final class JobProgress {
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getPosition()));
}
- /**
- * To json.
- *
- * @return json data
- */
- public String toJson() {
- JsonObject result = new JsonObject();
- result.addProperty("status", status.name());
- result.addProperty("databaseType", databaseType);
- result.add("inventory", getInventoryJson());
- result.add("incremental", getIncrementalJson());
- return result.toString();
- }
-
- private JsonElement getInventoryJson() {
- JsonObject result = new JsonObject();
- result.add("finished", getInventoryFinishedJson());
- result.add("unfinished", getInventoryUnfinishedJson());
- return result;
- }
-
- private JsonElement getInventoryFinishedJson() {
- JsonArray result = new JsonArray();
- inventoryTaskProgressMap.entrySet().stream()
- .filter(entry -> entry.getValue().getPosition() instanceof FinishedPosition)
- .forEach(entry -> result.add(entry.getKey()));
- return result;
- }
-
- private JsonElement getInventoryUnfinishedJson() {
- JsonObject result = new JsonObject();
- inventoryTaskProgressMap.entrySet().stream()
- .filter(entry -> !(entry.getValue().getPosition() instanceof FinishedPosition))
- .forEach(entry -> result.add(entry.getKey(), GSON.toJsonTree(entry.getValue(), entry.getValue().getClass())));
- return result;
- }
-
- private JsonElement getIncrementalJson() {
- JsonObject result = new JsonObject();
- incrementalTaskProgressMap.forEach((key, value) -> result.add(key, getIncrementalJson(value)));
- return result;
- }
-
- private JsonElement getIncrementalJson(final IncrementalTaskProgress incrementalTaskProgress) {
- JsonObject result = new JsonObject();
- result.add("position", GSON.toJsonTree(incrementalTaskProgress.getPosition(), incrementalTaskProgress.getPosition().getClass()));
- result.add("delay", GSON.toJsonTree(incrementalTaskProgress.getIncrementalTaskDelay()));
- return result;
- }
-
- /**
- * From json.
- *
- * @param data json data
- * @return job position
- */
- public static JobProgress fromJson(final String data) {
- JobProgress result = new JobProgress();
- JsonObject jsonObject = GSON.fromJson(data, JsonObject.class);
- result.setStatus(JobStatus.valueOf(jsonObject.get("status").getAsString()));
- result.setDatabaseType(jsonObject.get("databaseType").getAsString());
- result.setInventoryTaskProgressMap(getInventoryTaskProgressMap(jsonObject.get("inventory").getAsJsonObject()));
- result.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(jsonObject.get("incremental").getAsJsonObject(), jsonObject.get("databaseType").getAsString()));
- return result;
- }
-
- private static Map<String, InventoryTaskProgress> getInventoryTaskProgressMap(final JsonObject inventory) {
- JsonObject jsonObject = new JsonObject();
- jsonObject.add("inventory", inventory);
- return INVENTORY_POSITION_ADAPTED_GSON.fromJson(jsonObject, JobProgress.class).inventoryTaskProgressMap;
- }
-
- private static Map<String, IncrementalTaskProgress> getIncrementalTaskProgressMap(final JsonObject incremental, final String databaseType) {
- Class<?> incrementalPositionClass = PositionInitializerFactory.getPositionClass(databaseType);
- Map<String, IncrementalTaskProgress> result = Maps.newHashMap();
- for (String each : incremental.keySet()) {
- Position<?> position = (Position<?>) GSON.fromJson(incremental.get(each).getAsJsonObject().get("position"), incrementalPositionClass);
- IncrementalTaskDelay incrementalTaskDelay = GSON.fromJson(incremental.get(each).getAsJsonObject().get("delay"), IncrementalTaskDelay.class);
- result.put(each, new IncrementalTaskProgress(position, incrementalTaskDelay));
- }
- return result;
- }
-
- private static class InventoryPositionTypeAdapter extends TypeAdapter<Position<?>> {
-
- @Override
- public void write(final JsonWriter out, final Position<?> value) throws IOException {
- if (value instanceof PrimaryKeyPosition) {
- new PrimaryKeyPosition.PositionTypeAdapter().write(out, (PrimaryKeyPosition) value);
- } else if (value instanceof PlaceholderPosition) {
- new PlaceholderPosition.PositionTypeAdapter().write(out, (PlaceholderPosition) value);
- }
- }
-
- @Override
- public Position<?> read(final JsonReader in) throws IOException {
- in.beginArray();
- Position<?> result = in.hasNext() ? new PrimaryKeyPosition(in.nextLong(), in.nextLong()) : new PlaceholderPosition();
- in.endArray();
- return result;
- }
+ @Override
+ public String toString() {
+ return YamlEngine.marshal(JOB_PROGRESS_YAML_SWAPPER.swapToYaml(this));
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/yaml/JobProgressYamlSwapper.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/yaml/JobProgressYamlSwapper.java
new file mode 100644
index 0000000..7996273
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/yaml/JobProgressYamlSwapper.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.progress.yaml;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import org.apache.shardingsphere.scaling.core.job.JobStatus;
+import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializerFactory;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+import org.apache.shardingsphere.scaling.core.job.progress.JobProgress;
+import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
+import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ *Job progress yaml swapper.
+ */
+public final class JobProgressYamlSwapper {
+
+ /**
+ * Swap to yaml.
+ *
+ * @param jobProgress job progress
+ * @return yaml job progress
+ */
+ public YamlJobProgress swapToYaml(final JobProgress jobProgress) {
+ YamlJobProgress result = new YamlJobProgress();
+ result.setStatus(jobProgress.getStatus().name());
+ result.setDatabaseType(jobProgress.getDatabaseType());
+ result.setInventory(getYamlInventory(jobProgress.getInventoryTaskProgressMap()));
+ result.setIncremental(getYamlIncremental(jobProgress.getIncrementalTaskProgressMap()));
+ return result;
+ }
+
+ private YamlJobProgress.YamlInventory getYamlInventory(final Map<String, InventoryTaskProgress> inventoryTaskProgressMap) {
+ YamlJobProgress.YamlInventory result = new YamlJobProgress.YamlInventory();
+ result.setFinished(getFinished(inventoryTaskProgressMap));
+ result.setUnfinished(getUnfinished(inventoryTaskProgressMap));
+ return result;
+ }
+
+ private String[] getFinished(final Map<String, InventoryTaskProgress> inventoryTaskProgressMap) {
+ return inventoryTaskProgressMap.entrySet().stream()
+ .filter(entry -> entry.getValue().getPosition() instanceof FinishedPosition)
+ .map(Entry::getKey)
+ .toArray(String[]::new);
+ }
+
+ private Map<String, String> getUnfinished(final Map<String, InventoryTaskProgress> inventoryTaskProgressMap) {
+ return inventoryTaskProgressMap.entrySet().stream()
+ .filter(entry -> !(entry.getValue().getPosition() instanceof FinishedPosition))
+ .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getPosition().toString()));
+ }
+
+ private Map<String, YamlJobProgress.YamlIncremental> getYamlIncremental(final Map<String, IncrementalTaskProgress> incrementalTaskProgressMap) {
+ return incrementalTaskProgressMap.entrySet().stream()
+ .collect(Collectors.toMap(Entry::getKey, entry -> {
+ YamlJobProgress.YamlIncremental yamlIncremental = new YamlJobProgress.YamlIncremental();
+ yamlIncremental.setPosition(entry.getValue().getPosition().toString());
+ yamlIncremental.setDelay(entry.getValue().getIncrementalTaskDelay());
+ return yamlIncremental;
+ }));
+ }
+
+ /**
+ * Swap to object.
+ *
+ * @param yamlJobProgress yaml job progress
+ * @return job progress
+ */
+ public JobProgress swapToObject(final YamlJobProgress yamlJobProgress) {
+ JobProgress result = new JobProgress();
+ result.setStatus(JobStatus.valueOf(yamlJobProgress.getStatus()));
+ result.setDatabaseType(yamlJobProgress.getDatabaseType());
+ result.setInventoryTaskProgressMap(getInventoryTaskProgressMap(yamlJobProgress.getInventory()));
+ result.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(yamlJobProgress.getDatabaseType(), yamlJobProgress.getIncremental()));
+ return result;
+ }
+
+ private Map<String, InventoryTaskProgress> getInventoryTaskProgressMap(final YamlJobProgress.YamlInventory inventory) {
+ Map<String, InventoryTaskProgress> result = Maps.newHashMap();
+ result.putAll(Arrays.stream(inventory.getFinished())
+ .collect(Collectors.toMap(each -> each, each -> new InventoryTaskProgress(new FinishedPosition()))));
+ result.putAll(inventory.getUnfinished().entrySet().stream()
+ .collect(Collectors.toMap(Entry::getKey, getInventoryTaskProgressFunction())));
+ return result;
+ }
+
+ private Function<Entry<String, String>, InventoryTaskProgress> getInventoryTaskProgressFunction() {
+ return entry -> new InventoryTaskProgress(Strings.isNullOrEmpty(entry.getValue()) ? new PlaceholderPosition() : PrimaryKeyPosition.init(entry.getValue()));
+ }
+
+ private Map<String, IncrementalTaskProgress> getIncrementalTaskProgressMap(final String databaseType, final Map<String, YamlJobProgress.YamlIncremental> incremental) {
+ return incremental.entrySet().stream()
+ .collect(Collectors.toMap(Entry::getKey, getIncrementalTaskProgressFunction(databaseType)));
+ }
+
+ private Function<Entry<String, YamlJobProgress.YamlIncremental>, IncrementalTaskProgress> getIncrementalTaskProgressFunction(final String databaseType) {
+ return entry -> new IncrementalTaskProgress(PositionInitializerFactory.newInstance(databaseType).init(entry.getValue().getPosition()), entry.getValue().getDelay());
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/yaml/YamlJobProgress.java
similarity index 54%
copy from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/yaml/YamlJobProgress.java
index 087c8db..c27d949 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/progress/yaml/YamlJobProgress.java
@@ -15,33 +15,44 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.postgresql.wal;
+package org.apache.shardingsphere.scaling.core.job.progress.yaml;
-import lombok.AllArgsConstructor;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import lombok.Setter;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
-import org.postgresql.replication.LogSequenceNumber;
+import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskDelay;
+
+import java.util.Map;
/**
- * PostgreSQL wal position.
+ * Yaml job progress.
*/
-@RequiredArgsConstructor
-@AllArgsConstructor
@Getter
@Setter
-public final class WalPosition implements Position<WalPosition> {
+public final class YamlJobProgress {
+
+ private String status;
+
+ private String databaseType;
- private final LogSequenceNumber logSequenceNumber;
+ private YamlInventory inventory;
- private long delay;
+ private Map<String, YamlIncremental> incremental;
+
+ @Getter
+ @Setter
+ public static final class YamlInventory {
+
+ private String[] finished;
+
+ private Map<String, String> unfinished;
+ }
- @Override
- public int compareTo(final WalPosition position) {
- if (null == position) {
- return 1;
- }
- return Long.compare(logSequenceNumber.asLong(), position.logSequenceNumber.asLong());
+ @Getter
+ @Setter
+ public static final class YamlIncremental {
+
+ private String position;
+
+ private IncrementalTaskDelay delay;
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
index d419ec2..011542e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
@@ -50,7 +50,7 @@ public interface ScalingEntry extends DatabaseTypeAwareSPI {
*
* @return position initializer type
*/
- Class<? extends PositionInitializer<?>> getPositionInitializer();
+ Class<? extends PositionInitializer> getPositionInitializer();
/**
* Get importer type.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ReflectionUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ReflectionUtil.java
index 742e5f4..61e12a0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ReflectionUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ReflectionUtil.java
@@ -17,15 +17,12 @@
package org.apache.shardingsphere.scaling.core.util;
-import com.google.common.base.Preconditions;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
/**
* Reflection utils.
@@ -108,16 +105,4 @@ public final class ReflectionUtil {
method.setAccessible(true);
method.invoke(target, parameterValues);
}
-
- /**
- * Get interface generic class.
- *
- * @param clazz class
- * @return generic class
- */
- public static Class<?> getInterfaceGenericClass(final Class<?> clazz) {
- Type[] types = clazz.getGenericInterfaces();
- Preconditions.checkState(types.length == 1, "Only supported one generic type");
- return (Class<?>) ((ParameterizedType) types[0]).getActualTypeArguments()[0];
- }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
index 2a92275..8ab5162 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
@@ -39,7 +39,7 @@ public final class FixtureH2ScalingEntry implements ScalingEntry {
}
@Override
- public Class<? extends PositionInitializer<?>> getPositionInitializer() {
+ public Class<? extends PositionInitializer> getPositionInitializer() {
return FixturePositionInitializer.class;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixturePositionInitializer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixturePositionInitializer.java
index ac58683..074c5a1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixturePositionInitializer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixturePositionInitializer.java
@@ -22,10 +22,15 @@ import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
import javax.sql.DataSource;
-public final class FixturePositionInitializer implements PositionInitializer<PlaceholderPosition> {
+public final class FixturePositionInitializer implements PositionInitializer {
@Override
public PlaceholderPosition init(final DataSource dataSource) {
return new PlaceholderPosition();
}
+
+ @Override
+ public PlaceholderPosition init(final String data) {
+ return new PlaceholderPosition();
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPositionTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPositionTest.java
index 5dc9f09..e57c5bb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPositionTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPositionTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.scaling.core.job.position;
-import com.google.gson.Gson;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
@@ -25,8 +24,6 @@ import static org.junit.Assert.assertThat;
public final class PlaceholderPositionTest {
- public static final Gson GSON = new Gson();
-
@Test
public void assertCompareTo() {
PlaceholderPosition position1 = new PlaceholderPosition();
@@ -35,8 +32,7 @@ public final class PlaceholderPositionTest {
}
@Test
- public void assertTypeAdapter() {
- PlaceholderPosition position = GSON.fromJson("[]", PlaceholderPosition.class);
- assertThat(GSON.toJson(position), is("[]"));
+ public void assertToString() {
+ assertThat(new PlaceholderPosition().toString(), is(""));
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionTest.java
index ae1e0ab..2fbc79e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.scaling.core.job.position;
-import com.google.gson.Gson;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
@@ -35,15 +34,14 @@ public final class PrimaryKeyPositionTest {
}
@Test
- public void assertFormJson() {
- PrimaryKeyPosition position = new Gson().fromJson("[1,100]", PrimaryKeyPosition.class);
+ public void assertInit() {
+ PrimaryKeyPosition position = PrimaryKeyPosition.init("1,100");
assertThat(position.getBeginValue(), is(1L));
assertThat(position.getEndValue(), is(100L));
}
@Test
- public void assertToJson() {
- PrimaryKeyPosition position = new PrimaryKeyPosition(1, 100);
- assertThat(new Gson().toJson(position), is("[1,100]"));
+ public void assertToString() {
+ assertThat(new PrimaryKeyPosition(1, 100).toString(), is("1,100"));
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgressTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgressTest.java
new file mode 100644
index 0000000..2a8facc
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/progress/JobProgressTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.progress;
+
+import com.google.common.collect.Maps;
+import org.apache.shardingsphere.scaling.core.job.JobStatus;
+import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
+import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public final class JobProgressTest {
+
+ @Test
+ public void assertInit() {
+ JobProgress jobProgress = JobProgress.init(mockJobProgressYamlString());
+ assertThat(jobProgress.getStatus(), is(JobStatus.RUNNING));
+ assertThat(jobProgress.getDatabaseType(), is("H2"));
+ assertThat(jobProgress.getInventoryTaskProgressMap().size(), is(4));
+ assertThat(jobProgress.getIncrementalTaskProgressMap().size(), is(1));
+ }
+
+ @Test
+ public void assertGetIncrementalPosition() {
+ JobProgress jobProgress = JobProgress.init(mockJobProgressYamlString());
+ assertTrue(jobProgress.getIncrementalPosition("ds0") instanceof PlaceholderPosition);
+ }
+
+ @Test
+ public void assertGetInventoryPosition() {
+ JobProgress jobProgress = JobProgress.init(mockJobProgressYamlString());
+ assertThat(jobProgress.getInventoryPosition("ds0").size(), is(2));
+ assertTrue(jobProgress.getInventoryPosition("ds0").get("ds0.t_1") instanceof FinishedPosition);
+ assertTrue(jobProgress.getInventoryPosition("ds1").get("ds1.t_1") instanceof PlaceholderPosition);
+ assertTrue(jobProgress.getInventoryPosition("ds1").get("ds1.t_2") instanceof PrimaryKeyPosition);
+ }
+
+ @Test
+ public void assertToString() {
+ JobProgress jobProgress = new JobProgress();
+ jobProgress.setStatus(JobStatus.RUNNING);
+ jobProgress.setDatabaseType("H2");
+ jobProgress.setIncrementalTaskProgressMap(mockIncrementalTaskProgressMap());
+ jobProgress.setInventoryTaskProgressMap(mockInventoryTaskProgressMap());
+ assertThat(jobProgress.toString(), is(mockJobProgressYamlString()));
+ }
+
+ private Map<String, IncrementalTaskProgress> mockIncrementalTaskProgressMap() {
+ Map<String, IncrementalTaskProgress> result = Maps.newHashMap();
+ result.put("ds0", new IncrementalTaskProgress(new PlaceholderPosition()));
+ return result;
+ }
+
+ private Map<String, InventoryTaskProgress> mockInventoryTaskProgressMap() {
+ Map<String, InventoryTaskProgress> result = Maps.newHashMap();
+ result.put("ds0.t_1", new InventoryTaskProgress(new FinishedPosition()));
+ result.put("ds0.t_2", new InventoryTaskProgress(new FinishedPosition()));
+ result.put("ds1.t_1", new InventoryTaskProgress(new PlaceholderPosition()));
+ result.put("ds1.t_2", new InventoryTaskProgress(new PrimaryKeyPosition(1, 2)));
+ return result;
+ }
+
+ private String mockJobProgressYamlString() {
+ return "databaseType: H2\n"
+ + "incremental:\n"
+ + " ds0:\n"
+ + " delay:\n"
+ + " delayMilliseconds: -1\n"
+ + " lastEventTimestamps: 0\n"
+ + " position: ''\n"
+ + "inventory:\n"
+ + " finished:\n"
+ + " - ds0.t_2\n"
+ + " - ds0.t_1\n"
+ + " unfinished:\n"
+ + " ds1.t_2: 1,2\n"
+ + " ds1.t_1: ''\n"
+ + "status: RUNNING\n";
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/logback-test.xml b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/logback-test.xml
index be2ff44..f391881 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/logback-test.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/logback-test.xml
@@ -26,8 +26,9 @@
<appender-ref ref="console" />
</logger>
<logger name="com.zaxxer.hikari" level="warn" />
+ <logger name="org.apache.zookeeper.ClientCnxn" level="off" />
<logger name="org.apache.shardingsphere.scaling.core.job.preparer.splitter.InventoryTaskSplitter" level="off" />
-
+
<root>
<level value="info" />
<appender-ref ref="console" />
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
index b303d17..85ca7d3 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
@@ -17,10 +17,10 @@
package org.apache.shardingsphere.scaling.mysql;
+import org.apache.shardingsphere.scaling.core.common.sqlbuilder.ScalingSQLBuilder;
import org.apache.shardingsphere.scaling.core.executor.dumper.JDBCDumper;
import org.apache.shardingsphere.scaling.core.executor.dumper.LogDumper;
import org.apache.shardingsphere.scaling.core.executor.importer.Importer;
-import org.apache.shardingsphere.scaling.core.common.sqlbuilder.ScalingSQLBuilder;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
@@ -49,7 +49,7 @@ public final class MySQLScalingEntry implements ScalingEntry {
}
@Override
- public Class<? extends PositionInitializer<?>> getPositionInitializer() {
+ public Class<? extends PositionInitializer> getPositionInitializer() {
return MySQLPositionInitializer.class;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java
index a069f66..edc4c48 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java
@@ -36,9 +36,7 @@ public final class BinlogPosition implements Position<BinlogPosition> {
private final long position;
- private transient long serverId;
-
- private long delay;
+ private long serverId;
@Override
public int compareTo(final BinlogPosition position) {
@@ -51,4 +49,9 @@ public final class BinlogPosition implements Position<BinlogPosition> {
private long toLong() {
return Long.parseLong(filename.substring(filename.lastIndexOf('.') + 1)) << 32 | position;
}
+
+ @Override
+ public String toString() {
+ return String.format("%s#%d", filename, position);
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLBinlogDumper.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLBinlogDumper.java
index 1bcc5d7..d298735 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLBinlogDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLBinlogDumper.java
@@ -159,16 +159,14 @@ public final class MySQLBinlogDumper extends AbstractScalingExecutor implements
}
private DataRecord createDataRecord(final AbstractRowsEvent rowsEvent, final int columnCount) {
- long delay = System.currentTimeMillis() - rowsEvent.getTimestamp() * 1000;
- DataRecord result = new DataRecord(new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId(), delay), columnCount);
+ DataRecord result = new DataRecord(new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId()), columnCount);
result.setTableName(dumperConfig.getTableNameMap().get(rowsEvent.getTableName()));
result.setCommitTime(rowsEvent.getTimestamp() * 1000);
return result;
}
private void createPlaceholderRecord(final AbstractBinlogEvent event) {
- long delay = System.currentTimeMillis() - event.getTimestamp() * 1000;
- PlaceholderRecord record = new PlaceholderRecord(new BinlogPosition(event.getFileName(), event.getPosition(), event.getServerId(), delay));
+ PlaceholderRecord record = new PlaceholderRecord(new BinlogPosition(event.getFileName(), event.getPosition(), event.getServerId()));
record.setCommitTime(event.getTimestamp() * 1000);
pushRecord(record);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionInitializer.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionInitializer.java
index b4eae3e..d5ee6e0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionInitializer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionInitializer.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.mysql.component;
+import com.google.common.base.Preconditions;
import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
@@ -29,7 +30,7 @@ import java.sql.SQLException;
/**
* MySQL binlog position initializer.
*/
-public final class MySQLPositionInitializer implements PositionInitializer<BinlogPosition> {
+public final class MySQLPositionInitializer implements PositionInitializer {
@Override
public BinlogPosition init(final DataSource dataSource) throws SQLException {
@@ -40,6 +41,13 @@ public final class MySQLPositionInitializer implements PositionInitializer<Binlo
}
}
+ @Override
+ public BinlogPosition init(final String data) {
+ String[] array = data.split("#");
+ Preconditions.checkArgument(array.length == 2, "Unknown binlog position: " + data);
+ return new BinlogPosition(array[0], Long.parseLong(array[1]));
+ }
+
private BinlogPosition getBinlogPosition(final Connection connection) throws SQLException {
try (PreparedStatement preparedStatement = connection.prepareStatement("SHOW MASTER STATUS");
ResultSet resultSet = preparedStatement.executeQuery()) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPositionTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPositionTest.java
index 75aedce..7b3d68c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPositionTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPositionTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.scaling.mysql.binlog;
-import com.google.gson.Gson;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
@@ -41,8 +40,7 @@ public final class BinlogPositionTest {
}
@Test
- public void assertToJson() {
- BinlogPosition binlogPosition = new BinlogPosition("mysql-bin.000001", 4);
- assertThat(new Gson().toJson(binlogPosition), is("{\"filename\":\"mysql-bin.000001\",\"position\":4,\"delay\":0}"));
+ public void assertToString() {
+ assertThat(new BinlogPosition("mysql-bin.000001", 4).toString(), is("mysql-bin.000001#4"));
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
index fc506ed..3b9d862 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
@@ -49,7 +49,7 @@ public final class PostgreSQLScalingEntry implements ScalingEntry {
}
@Override
- public Class<? extends PositionInitializer<?>> getPositionInitializer() {
+ public Class<? extends PositionInitializer> getPositionInitializer() {
return PostgreSQLPositionInitializer.class;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
index 11e84db..457bdc3 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
@@ -31,7 +31,7 @@ import java.sql.SQLException;
/**
* PostgreSQL wal position initializer.
*/
-public final class PostgreSQLPositionInitializer implements PositionInitializer<WalPosition> {
+public final class PostgreSQLPositionInitializer implements PositionInitializer {
public static final String SLOT_NAME = "sharding_scaling";
@@ -47,6 +47,11 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer<
}
}
+ @Override
+ public WalPosition init(final String data) {
+ return new WalPosition(LogSequenceNumber.valueOf(Long.parseLong(data)));
+ }
+
private void createIfNotExists(final Connection connection) throws SQLException {
try (PreparedStatement ps = connection.prepareStatement(String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", SLOT_NAME, DECODE_PLUGIN))) {
ps.execute();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
index 087c8db..07ef76c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
@@ -17,10 +17,8 @@
package org.apache.shardingsphere.scaling.postgresql.wal;
-import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import lombok.Setter;
import org.apache.shardingsphere.scaling.core.job.position.Position;
import org.postgresql.replication.LogSequenceNumber;
@@ -28,15 +26,11 @@ import org.postgresql.replication.LogSequenceNumber;
* PostgreSQL wal position.
*/
@RequiredArgsConstructor
-@AllArgsConstructor
@Getter
-@Setter
public final class WalPosition implements Position<WalPosition> {
private final LogSequenceNumber logSequenceNumber;
- private long delay;
-
@Override
public int compareTo(final WalPosition position) {
if (null == position) {
@@ -44,4 +38,9 @@ public final class WalPosition implements Position<WalPosition> {
}
return Long.compare(logSequenceNumber.asLong(), position.logSequenceNumber.asLong());
}
+
+ @Override
+ public String toString() {
+ return String.valueOf(logSequenceNumber.asLong());
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java
index 967622f..9b2c590 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.scaling.postgresql.wal;
-import com.google.gson.Gson;
import org.junit.Test;
import org.postgresql.replication.LogSequenceNumber;
@@ -26,8 +25,6 @@ import static org.junit.Assert.assertThat;
public final class WalPositionTest {
- private static final Gson GSON = new Gson();
-
@Test
public void assertCompareTo() {
WalPosition walPosition = new WalPosition(LogSequenceNumber.valueOf(100L));
@@ -36,15 +33,7 @@ public final class WalPositionTest {
}
@Test
- public void assertToJson() {
- WalPosition walPosition = new WalPosition(LogSequenceNumber.valueOf(100L));
- assertThat(GSON.toJson(walPosition), is("{\"logSequenceNumber\":{\"value\":100},\"delay\":0}"));
- }
-
- @Test
- public void assertFromJson() {
- WalPosition walPosition = GSON.fromJson("{\"logSequenceNumber\":{\"value\":100},\"delay\":0}", WalPosition.class);
- assertThat(walPosition.getLogSequenceNumber().asLong(), is(100L));
- assertThat(walPosition.getDelay(), is(0L));
+ public void assertToString() {
+ assertThat(new WalPosition(LogSequenceNumber.valueOf(100L)).toString(), is("100"));
}
}