You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/02 14:25:42 UTC

[flink] 01/02: [hotfix][table-common] Enable finding multiple matching TableFactories from TableFactoryService

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a224b3e0b2e11fcd580edb5f8060f7ffd3c61a7
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jun 19 15:00:02 2019 +0200

    [hotfix][table-common] Enable finding multiple matching TableFactories from TableFactoryService
    
    This commits adds methods TableFactoryService.findAll that return TableFactories even if they are ambiguous based on the requiredContext and supportedProperties. Additionally it fixes minor issues and improves type hanlding.
---
 .../table/api/AmbiguousTableFactoryException.java  |  36 +++--
 .../apache/flink/table/factories/TableFactory.java |   1 -
 .../flink/table/factories/TableFactoryService.java | 146 ++++++++++++++-------
 .../factories/TableFormatFactoryServiceTest.scala  |   7 -
 4 files changed, 117 insertions(+), 73 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/AmbiguousTableFactoryException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/AmbiguousTableFactoryException.java
index 395d5c98..ea1c72e 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/AmbiguousTableFactoryException.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/AmbiguousTableFactoryException.java
@@ -31,20 +31,20 @@ import java.util.stream.Collectors;
 public class AmbiguousTableFactoryException extends RuntimeException {
 
 	// factories that match the properties
-	private final List<TableFactory> matchingFactories;
+	private final List<? extends TableFactory> matchingFactories;
 	// required factory class
-	private final Class<?> factoryClass;
+	private final Class<? extends TableFactory> factoryClass;
 	// all found factories
 	private final List<TableFactory> factories;
 	// properties that describe the configuration
 	private final Map<String, String> properties;
 
 	public AmbiguousTableFactoryException(
-		List<TableFactory> matchingFactories,
-		Class<?> factoryClass,
-		List<TableFactory> factories,
-		Map<String, String> properties,
-		Throwable cause) {
+			List<? extends TableFactory> matchingFactories,
+			Class<? extends TableFactory> factoryClass,
+			List<TableFactory> factories,
+			Map<String, String> properties,
+			Throwable cause) {
 
 		super(cause);
 		this.matchingFactories = matchingFactories;
@@ -54,10 +54,10 @@ public class AmbiguousTableFactoryException extends RuntimeException {
 	}
 
 	public AmbiguousTableFactoryException(
-		List<TableFactory> matchingFactories,
-		Class<?> factoryClass,
-		List<TableFactory> factories,
-		Map<String, String> properties) {
+			List<? extends TableFactory> matchingFactories,
+			Class<? extends TableFactory> factoryClass,
+			List<TableFactory> factories,
+			Map<String, String> properties) {
 
 		this(matchingFactories, factoryClass, factories, properties, null);
 	}
@@ -70,15 +70,13 @@ public class AmbiguousTableFactoryException extends RuntimeException {
 				"The following properties are requested:\n%s\n\n" +
 				"The following factories have been considered:\n%s",
 			factoryClass.getName(),
-			String.join(
-				"\n",
-				matchingFactories.stream()
-					.map(p -> p.getClass().getName()).collect(Collectors.toList())),
+			matchingFactories.stream()
+				.map(p -> p.getClass().getName())
+				.collect(Collectors.joining("\n")),
 			DescriptorProperties.toString(properties),
-			String.join(
-				"\n",
-				factories.stream().map(p -> p.getClass().getName()).collect(Collectors.toList())
-			)
+			factories.stream()
+				.map(p -> p.getClass().getName())
+				.collect(Collectors.joining("\n"))
 		);
 	}
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactory.java
index b56ffa1..d4c0628 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactory.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactory.java
@@ -55,7 +55,6 @@ public interface TableFactory {
 	 */
 	Map<String, String> requiredContext();
 
-
 	/**
 	 * List of property keys that this factory can handle. This method will be used for validation.
 	 * If a property is passed that this factory cannot handle, an exception will be thrown. The
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java
index ba6eec4..082c37d 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java
@@ -62,9 +62,9 @@ public class TableFactoryService {
 	 * @param <T> factory class type
 	 * @return the matching factory
 	 */
-	public static <T> T find(Class<T> factoryClass, Descriptor descriptor) {
+	public static <T extends TableFactory> T find(Class<T> factoryClass, Descriptor descriptor) {
 		Preconditions.checkNotNull(descriptor);
-		return findInternal(factoryClass, descriptor.toProperties(), Optional.empty());
+		return findSingleInternal(factoryClass, descriptor.toProperties(), Optional.empty());
 	}
 
 	/**
@@ -76,10 +76,13 @@ public class TableFactoryService {
 	 * @param <T> factory class type
 	 * @return the matching factory
 	 */
-	public static <T> T find(Class<T> factoryClass, Descriptor descriptor, ClassLoader classLoader) {
+	public static <T extends TableFactory> T find(
+			Class<T> factoryClass,
+			Descriptor descriptor,
+			ClassLoader classLoader) {
 		Preconditions.checkNotNull(descriptor);
 		Preconditions.checkNotNull(classLoader);
-		return findInternal(factoryClass, descriptor.toProperties(), Optional.of(classLoader));
+		return findSingleInternal(factoryClass, descriptor.toProperties(), Optional.of(classLoader));
 	}
 
 	/**
@@ -90,8 +93,8 @@ public class TableFactoryService {
 	 * @param <T> factory class type
 	 * @return the matching factory
 	 */
-	public static <T> T find(Class<T> factoryClass, Map<String, String> propertyMap) {
-		return findInternal(factoryClass, propertyMap, Optional.empty());
+	public static <T extends TableFactory> T find(Class<T> factoryClass, Map<String, String> propertyMap) {
+		return findSingleInternal(factoryClass, propertyMap, Optional.empty());
 	}
 
 	/**
@@ -103,9 +106,52 @@ public class TableFactoryService {
 	 * @param <T> factory class type
 	 * @return the matching factory
 	 */
-	public static <T> T find(Class<T> factoryClass, Map<String, String> propertyMap, ClassLoader classLoader) {
+	public static <T extends TableFactory> T find(
+			Class<T> factoryClass,
+			Map<String, String> propertyMap,
+			ClassLoader classLoader) {
 		Preconditions.checkNotNull(classLoader);
-		return findInternal(factoryClass, propertyMap, Optional.of(classLoader));
+		return findSingleInternal(factoryClass, propertyMap, Optional.of(classLoader));
+	}
+
+	/**
+	 * Finds all table factories of the given class and property map.
+	 *
+	 * @param factoryClass desired factory class
+	 * @param propertyMap properties that describe the factory configuration
+	 * @param <T> factory class type
+	 * @return all the matching factories
+	 */
+	public static <T extends TableFactory> List<T> findAll(Class<T> factoryClass, Map<String, String> propertyMap) {
+		return findAllInternal(factoryClass, propertyMap, Optional.empty());
+	}
+
+	/**
+	 * Finds a table factory of the given class, property map, and classloader.
+	 *
+	 * @param factoryClass desired factory class
+	 * @param properties properties that describe the factory configuration
+	 * @param classLoader classloader for service loading
+	 * @param <T> factory class type
+	 * @return the matching factory
+	 */
+	private static <T extends TableFactory> T findSingleInternal(
+			Class<T> factoryClass,
+			Map<String, String> properties,
+			Optional<ClassLoader> classLoader) {
+
+		List<TableFactory> tableFactories = discoverFactories(classLoader);
+		List<T> filtered = filter(tableFactories, factoryClass, properties);
+
+		if (filtered.size() > 1) {
+			throw new AmbiguousTableFactoryException(
+				filtered,
+				factoryClass,
+				tableFactories,
+				properties);
+		} else {
+			return filtered.get(0);
+		}
 	}
 
 	/**
@@ -117,19 +163,32 @@ public class TableFactoryService {
 	 * @param <T> factory class type
 	 * @return the matching factory
 	 */
-	public static <T> T findInternal(Class<T> factoryClass, Map<String, String> properties, Optional<ClassLoader> classLoader) {
+	private static <T extends TableFactory> List<T> findAllInternal(
+			Class<T> factoryClass,
+			Map<String, String> properties,
+			Optional<ClassLoader> classLoader) {
+
+		List<TableFactory> tableFactories = discoverFactories(classLoader);
+		return filter(tableFactories, factoryClass, properties);
+	}
+
+	/**
+	 * Filters found factories by factory class and with matching context.
+	 */
+	private static <T extends TableFactory> List<T> filter(
+			List<TableFactory> foundFactories,
+			Class<T> factoryClass,
+			Map<String, String> properties) {
 
 		Preconditions.checkNotNull(factoryClass);
 		Preconditions.checkNotNull(properties);
 
-		List<TableFactory> foundFactories = discoverFactories(classLoader);
-
-		List<TableFactory> classFactories = filterByFactoryClass(
+		List<T> classFactories = filterByFactoryClass(
 			factoryClass,
 			properties,
 			foundFactories);
 
-		List<TableFactory> contextFactories = filterByContext(
+		List<T> contextFactories = filterByContext(
 			factoryClass,
 			properties,
 			foundFactories,
@@ -169,10 +228,11 @@ public class TableFactoryService {
 	/**
 	 * Filters factories with matching context by factory class.
 	 */
-	private static <T> List<TableFactory> filterByFactoryClass(
-		Class<T> factoryClass,
-		Map<String, String> properties,
-		List<TableFactory> foundFactories) {
+	@SuppressWarnings("unchecked")
+	private static <T> List<T> filterByFactoryClass(
+			Class<T> factoryClass,
+			Map<String, String> properties,
+			List<TableFactory> foundFactories) {
 
 		List<TableFactory> classFactories = foundFactories.stream()
 			.filter(p -> factoryClass.isAssignableFrom(p.getClass()))
@@ -186,7 +246,7 @@ public class TableFactoryService {
 				properties);
 		}
 
-		return classFactories;
+		return (List<T>) classFactories;
 	}
 
 	/**
@@ -194,13 +254,13 @@ public class TableFactoryService {
 	 *
 	 * @return all matching factories
 	 */
-	private static <T> List<TableFactory> filterByContext(
-		Class<T> factoryClass,
-		Map<String, String> properties,
-		List<TableFactory> foundFactories,
-		List<TableFactory> classFactories) {
+	private static <T extends TableFactory> List<T> filterByContext(
+			Class<T> factoryClass,
+			Map<String, String> properties,
+			List<TableFactory> foundFactories,
+			List<T> classFactories) {
 
-		List<TableFactory> matchingFactories = classFactories.stream().filter(factory -> {
+		List<T> matchingFactories = classFactories.stream().filter(factory -> {
 			Map<String, String> requestedContext = normalizeContext(factory);
 
 			Map<String, String> plainContext = new HashMap<>(requestedContext);
@@ -214,7 +274,9 @@ public class TableFactoryService {
 			plainContext.remove(org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION);
 
 			// check if required context is met
-			return plainContext.keySet().stream().allMatch(e -> properties.containsKey(e) && properties.get(e).equals(plainContext.get(e)));
+			return plainContext.keySet()
+				.stream()
+				.allMatch(e -> properties.containsKey(e) && properties.get(e).equals(plainContext.get(e)));
 		}).collect(Collectors.toList());
 
 		if (matchingFactories.isEmpty()) {
@@ -238,17 +300,17 @@ public class TableFactoryService {
 				String.format("Required context of factory '%s' must not be null.", factory.getClass().getName()));
 		}
 		return requiredContext.keySet().stream()
-			.collect(Collectors.toMap(key -> key.toLowerCase(), key -> requiredContext.get(key)));
+			.collect(Collectors.toMap(String::toLowerCase, requiredContext::get));
 	}
 
 	/**
 	 * Filters the matching class factories by supported properties.
 	 */
-	private static <T> T filterBySupportedProperties(
-		Class<T> factoryClass,
-		Map<String, String> properties,
-		List<TableFactory> foundFactories,
-		List<TableFactory> classFactories) {
+	private static <T extends TableFactory> List<T> filterBySupportedProperties(
+			Class<T> factoryClass,
+			Map<String, String> properties,
+			List<TableFactory> foundFactories,
+			List<T> classFactories) {
 
 		final List<String> plainGivenKeys = new LinkedList<>();
 		properties.keySet().forEach(k -> {
@@ -261,8 +323,8 @@ public class TableFactoryService {
 		});
 
 		Optional<String> lastKey = Optional.empty();
-		List<TableFactory> supportedFactories = new LinkedList<>();
-		for (TableFactory factory: classFactories) {
+		List<T> supportedFactories = new LinkedList<>();
+		for (T factory: classFactories) {
 			Set<String> requiredContextKeys = normalizeContext(factory).keySet();
 			Tuple2<List<String>, List<String>> tuple2 = normalizeSupportedProperties(factory);
 			// ignore context keys
@@ -273,10 +335,10 @@ public class TableFactoryService {
 				factory,
 				givenContextFreeKeys);
 
-			Boolean allTrue = true;
+			boolean allTrue = true;
 			for (String k: givenFilteredKeys) {
 				lastKey = Optional.of(k);
-				if (!(tuple2.f0.contains(k) || tuple2.f1.stream().anyMatch(p -> k.startsWith(p)))) {
+				if (!(tuple2.f0.contains(k) || tuple2.f1.stream().anyMatch(k::startsWith))) {
 					allTrue = false;
 					break;
 				}
@@ -310,15 +372,9 @@ public class TableFactoryService {
 				factoryClass,
 				foundFactories,
 				properties);
-		} else if (supportedFactories.size() > 1) {
-			throw new AmbiguousTableFactoryException(
-				supportedFactories,
-				factoryClass,
-				foundFactories,
-				properties);
 		}
 
-		return (T) supportedFactories.get(0);
+		return supportedFactories;
 	}
 
 	/**
@@ -332,7 +388,7 @@ public class TableFactoryService {
 					factory.getClass().getName()));
 		}
 		List<String> supportedKeys = supportedProperties.stream()
-			.map(p -> p.toLowerCase())
+			.map(String::toLowerCase)
 			.collect(Collectors.toList());
 
 		// extract wildcard prefixes
@@ -353,9 +409,7 @@ public class TableFactoryService {
 	/**
 	 * Performs filtering for special cases (i.e. table format factories with schema derivation).
 	 */
-	private static List<String> filterSupportedPropertiesFactorySpecific(
-		TableFactory factory,
-		List<String> keys) {
+	private static List<String> filterSupportedPropertiesFactorySpecific(TableFactory factory, List<String> keys) {
 
 		if (factory instanceof TableFormatFactory) {
 			boolean includeSchema = ((TableFormatFactory) factory).supportsSchemaDerivation();
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/factories/TableFormatFactoryServiceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/factories/TableFormatFactoryServiceTest.scala
index ab5a051..6b695c7 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/factories/TableFormatFactoryServiceTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/factories/TableFormatFactoryServiceTest.scala
@@ -94,13 +94,6 @@ class TableFormatFactoryServiceTest {
   }
 
   @Test(expected = classOf[NoMatchingTableFactoryException])
-  def testMissingClass(): Unit = {
-    val props = properties()
-    // this class is not a valid factory
-    TableFactoryService.find(classOf[TableFormatFactoryServiceTest], props)
-  }
-
-  @Test(expected = classOf[NoMatchingTableFactoryException])
   def testInvalidContext(): Unit = {
     val props = properties()
     // no context specifies this