You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by sm...@apache.org on 2020/01/17 15:35:02 UTC
[knox] branch master updated: KNOX-2186 - Advanced service
discovery configuration handling (#238)
This is an automated email from the ASF dual-hosted git repository.
smolnar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git
The following commit(s) were added to refs/heads/master by this push:
new 69b08af KNOX-2186 - Advanced service discovery configuration handling (#238)
69b08af is described below
commit 69b08afa9080c58ea83ee331f1e6e6b3c4be1392
Author: Sandor Molnar <sm...@apache.org>
AuthorDate: Fri Jan 17 16:34:53 2020 +0100
KNOX-2186 - Advanced service discovery configuration handling (#238)
* KNOX-2186 - Added support for services without url/version/parameters
* KNOX-2186 - Advanced service discovery configuration handling
---
.../ClouderaManagerIntegrationMessages.java | 6 ++
.../ClouderaManagerDescriptorMonitor.java | 25 ++++--
.../ClouderaManagerDescriptorParser.java | 94 +++++++++++++++-------
...vanceServiceDiscoveryConfigurationMessages.java | 36 +++++++++
.../advanced/AdvancedServiceDiscoveryConfig.java | 72 +++++++++++++++++
...vancedServiceDiscoveryConfigChangeListener.java | 28 +++++++
...vancedServiceDiscoveryConfigurationMonitor.java | 91 +++++++++++++++++++++
.../ClouderaManagerDescriptorParserTest.java | 61 ++++++++++++--
.../src/test/resources/testDescriptor.xml | 7 +-
...tDescriptorConfigurationWithWrongDescriptor.xml | 2 +-
.../org/apache/knox/gateway/GatewayServer.java | 8 +-
.../gateway/config/impl/GatewayConfigImpl.java | 7 ++
.../apache/knox/gateway/config/GatewayConfig.java | 5 ++
.../org/apache/knox/gateway/GatewayTestConfig.java | 5 ++
.../topology/simple/SimpleDescriptorImpl.java | 5 ++
15 files changed, 404 insertions(+), 48 deletions(-)
diff --git a/gateway-cm-integration/src/main/java/org/apache/knox/gateway/ClouderaManagerIntegrationMessages.java b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/ClouderaManagerIntegrationMessages.java
index 1312bc8..4909533 100644
--- a/gateway-cm-integration/src/main/java/org/apache/knox/gateway/ClouderaManagerIntegrationMessages.java
+++ b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/ClouderaManagerIntegrationMessages.java
@@ -44,4 +44,10 @@ public interface ClouderaManagerIntegrationMessages {
@Message(level = MessageLevel.ERROR, text = "Error while producing Knox descriptor: {0}")
void failedToProduceKnoxDescriptor(String errorMessage, @StackTrace(level = MessageLevel.DEBUG) Exception e);
+
+ @Message(level = MessageLevel.WARN, text = "Service {0} is disabled. It will NOT be added in {1}")
+ void serviceDisabled(String serviceName, String descriptorName);
+
+ @Message(level = MessageLevel.INFO, text = "Updated advanced service discovery configuration.")
+ void updatedAdvanceServiceDiscoverytConfiguration();
}
diff --git a/gateway-cm-integration/src/main/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorMonitor.java b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorMonitor.java
index a727839..efc1471 100644
--- a/gateway-cm-integration/src/main/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorMonitor.java
+++ b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorMonitor.java
@@ -24,6 +24,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileTime;
+import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -34,22 +35,25 @@ import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.knox.gateway.ClouderaManagerIntegrationMessages;
import org.apache.knox.gateway.config.GatewayConfig;
import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+import org.apache.knox.gateway.topology.discovery.advanced.AdvancedServiceDiscoveryConfigChangeListener;
import org.apache.knox.gateway.util.JsonUtils;
/**
* Monitoring KNOX_DESCRIPTOR_DIR for *.cm files - which is a Hadoop XML configuration - and processing those files if they were modified
* since the last time it they were processed
*/
-public class ClouderaManagerDescriptorMonitor {
+public class ClouderaManagerDescriptorMonitor implements AdvancedServiceDiscoveryConfigChangeListener {
private static final String CM_DESCRIPTOR_FILE_EXTENSION = ".cm";
private static final ClouderaManagerIntegrationMessages LOG = MessagesFactory.get(ClouderaManagerIntegrationMessages.class);
private final String descriptorsDir;
private final long monitoringInterval;
private final ScheduledExecutorService executorService;
+ private final ClouderaManagerDescriptorParser cmDescriptorParser;
private FileTime lastReloadTime;
- public ClouderaManagerDescriptorMonitor(GatewayConfig gatewayConfig) {
+ public ClouderaManagerDescriptorMonitor(GatewayConfig gatewayConfig, ClouderaManagerDescriptorParser cmDescriptorParser) {
+ this.cmDescriptorParser = cmDescriptorParser;
this.descriptorsDir = gatewayConfig.getGatewayDescriptorsDir();
this.monitoringInterval = gatewayConfig.getClouderaManagerDescriptorsMonitoringInterval();
this.executorService = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("ClouderaManagerDescriptorMonitor-%d").build());
@@ -57,23 +61,23 @@ public class ClouderaManagerDescriptorMonitor {
public void setupMonitor() {
if (monitoringInterval > 0) {
- executorService.scheduleAtFixedRate(() -> monitorClouderaManagerDescriptors(), 0, monitoringInterval, TimeUnit.MILLISECONDS);
+ executorService.scheduleAtFixedRate(() -> monitorClouderaManagerDescriptors(false), 0, monitoringInterval, TimeUnit.MILLISECONDS);
LOG.monitoringClouderaManagerDescriptor(descriptorsDir);
}
}
- private void monitorClouderaManagerDescriptors() {
+ private void monitorClouderaManagerDescriptors(boolean force) {
final File[] clouderaManagerDescriptorFiles = new File(descriptorsDir).listFiles((FileFilter) new SuffixFileFilter(CM_DESCRIPTOR_FILE_EXTENSION));
for (File clouderaManagerDescriptorFile : clouderaManagerDescriptorFiles) {
- monitorClouderaManagerDescriptor(Paths.get(clouderaManagerDescriptorFile.getAbsolutePath()));
+ monitorClouderaManagerDescriptor(Paths.get(clouderaManagerDescriptorFile.getAbsolutePath()), force);
}
}
- private void monitorClouderaManagerDescriptor(Path clouderaManagerDescriptorFile) {
+ private void monitorClouderaManagerDescriptor(Path clouderaManagerDescriptorFile, boolean force) {
try {
if (Files.isReadable(clouderaManagerDescriptorFile)) {
final FileTime lastModifiedTime = Files.getLastModifiedTime(clouderaManagerDescriptorFile);
- if (lastReloadTime == null || lastReloadTime.compareTo(lastModifiedTime) < 0) {
+ if (force || lastReloadTime == null || lastReloadTime.compareTo(lastModifiedTime) < 0) {
lastReloadTime = lastModifiedTime;
processClouderaManagerDescriptor(clouderaManagerDescriptorFile.toString());
}
@@ -86,7 +90,7 @@ public class ClouderaManagerDescriptorMonitor {
}
private void processClouderaManagerDescriptor(String descriptorFilePath) {
- ClouderaManagerDescriptorParser.parse(descriptorFilePath).forEach(simpleDescriptor -> {
+ cmDescriptorParser.parse(descriptorFilePath).forEach(simpleDescriptor -> {
try {
final File knoxDescriptorFile = new File(descriptorsDir, simpleDescriptor.getName() + ".json");
FileUtils.writeStringToFile(knoxDescriptorFile, JsonUtils.renderAsJsonString(simpleDescriptor), StandardCharsets.UTF_8);
@@ -95,4 +99,9 @@ public class ClouderaManagerDescriptorMonitor {
}
});
}
+
+ @Override
+ public void onAdvancedServiceDiscoveryConfigurationChange(Properties newConfiguration) {
+ monitorClouderaManagerDescriptors(true);
+ }
}
diff --git a/gateway-cm-integration/src/main/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorParser.java b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorParser.java
index 44075fa..2986136 100644
--- a/gateway-cm-integration/src/main/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorParser.java
+++ b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorParser.java
@@ -19,18 +19,21 @@ package org.apache.knox.gateway.cm.descriptor;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.LinkedHashSet;
+import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.knox.gateway.ClouderaManagerIntegrationMessages;
import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+import org.apache.knox.gateway.topology.discovery.advanced.AdvancedServiceDiscoveryConfig;
+import org.apache.knox.gateway.topology.discovery.advanced.AdvancedServiceDiscoveryConfigChangeListener;
import org.apache.knox.gateway.topology.simple.SimpleDescriptor;
import org.apache.knox.gateway.topology.simple.SimpleDescriptorImpl;
import org.apache.knox.gateway.topology.simple.SimpleDescriptorImpl.ApplicationImpl;
import org.apache.knox.gateway.topology.simple.SimpleDescriptorImpl.ServiceImpl;
-public class ClouderaManagerDescriptorParser {
+public class ClouderaManagerDescriptorParser implements AdvancedServiceDiscoveryConfigChangeListener {
private static final ClouderaManagerIntegrationMessages log = MessagesFactory.get(ClouderaManagerIntegrationMessages.class);
private static final String CONFIG_NAME_DISCOVERY_TYPE = "discoveryType";
private static final String CONFIG_NAME_DISCOVERY_ADDRESS = "discoveryAddress";
@@ -42,6 +45,12 @@ public class ClouderaManagerDescriptorParser {
private static final String CONFIG_NAME_SERVICE_URL = "url";
private static final String CONFIG_NAME_SERVICE_VERSION = "version";
+ private AdvancedServiceDiscoveryConfig advancedServiceDiscoveryConfig;
+
+ public ClouderaManagerDescriptorParser() {
+ advancedServiceDiscoveryConfig = new AdvancedServiceDiscoveryConfig();
+ }
+
/**
* Produces a set of {@link SimpleDescriptor}s from the specified file.
*
@@ -49,7 +58,7 @@ public class ClouderaManagerDescriptorParser {
* The path to the configuration file which holds descriptor information in a pre-defined format.
* @return A SimpleDescriptor based on the contents of the given file.
*/
- public static Set<SimpleDescriptor> parse(String path) {
+ public Set<SimpleDescriptor> parse(String path) {
try {
log.parseClouderaManagerDescriptor(path);
final Configuration xmlConfiguration = new Configuration(false);
@@ -64,7 +73,7 @@ public class ClouderaManagerDescriptorParser {
}
}
- private static Set<SimpleDescriptor> parseXmlConfig(Configuration xmlConfiguration) {
+ private Set<SimpleDescriptor> parseXmlConfig(Configuration xmlConfiguration) {
final Set<SimpleDescriptor> descriptors = new LinkedHashSet<>();
xmlConfiguration.forEach(xmlDescriptor -> {
SimpleDescriptor descriptor = parseXmlDescriptor(xmlDescriptor.getKey(), xmlDescriptor.getValue());
@@ -75,7 +84,7 @@ public class ClouderaManagerDescriptorParser {
return descriptors;
}
- private static SimpleDescriptor parseXmlDescriptor(String name, String xmlValue) {
+ private SimpleDescriptor parseXmlDescriptor(String name, String xmlValue) {
try {
final SimpleDescriptorImpl descriptor = new SimpleDescriptorImpl();
descriptor.setName(name);
@@ -111,13 +120,29 @@ public class ClouderaManagerDescriptorParser {
break;
}
}
+ if (advancedServiceDiscoveryConfig.getExpectedTopologyNames().contains(name)) {
+ addEnabledServices(descriptor);
+ }
return descriptor;
- } catch(Exception e) {
+ } catch (Exception e) {
log.failedToParseDescriptor(name, e.getMessage(), e);
return null;
}
}
+ /*
+ * Adds any enabled service which is not listed in the CM descriptor
+ */
+ private void addEnabledServices(SimpleDescriptorImpl descriptor) {
+ advancedServiceDiscoveryConfig.getEnabledServiceNames().forEach(enabledServiceName -> {
+ if(descriptor.getService(enabledServiceName) == null) {
+ ServiceImpl service = new ServiceImpl();
+ service.setName(enabledServiceName);
+ descriptor.addService(service);
+ }
+ });
+ }
+
/**
* An application consists of the following parts: <code>app:$APPLICATION_NAME[:$PARAMETER_NAME=$PARAMETER_VALUE]</code>. Parameters are
* optional. <br>
@@ -127,7 +152,7 @@ public class ClouderaManagerDescriptorParser {
* <li>app:knoxauth:param1.name=param1.value</li>
* </ul>
*/
- private static void parseApplication(SimpleDescriptorImpl descriptor, String configurationPair) {
+ private void parseApplication(SimpleDescriptorImpl descriptor, String configurationPair) {
final String[] applicationParts = configurationPair.split(":");
final String applicationName = applicationParts[1].trim();
ApplicationImpl application = (ApplicationImpl) descriptor.getApplication(applicationName);
@@ -148,6 +173,7 @@ public class ClouderaManagerDescriptorParser {
/**
* A service consists of the following parts:
* <ul>
+ * <li><code>$SERVICE_NAME</code></li>
* <li><code>$SERVICE_NAME:url=$URL</code></li>
* <li><code>$SERVICE_NAME:version=$VERSION</code> (optional)</li>
* <li><code>$SERVICE_NAME[:$PARAMETER_NAME=$PARAMETER_VALUE] (optional)</code></li>
@@ -159,32 +185,44 @@ public class ClouderaManagerDescriptorParser {
* <li>HIVE:param1.name=param1.value</li>
* </ul>
*/
- private static void parseService(SimpleDescriptorImpl descriptor, String configurationPair) {
+ private void parseService(SimpleDescriptorImpl descriptor, String configurationPair) {
final String[] serviceParts = configurationPair.split(":");
final String serviceName = serviceParts[0].trim();
- ServiceImpl service = (ServiceImpl) descriptor.getService(serviceName);
- if (service == null) {
- service = new ServiceImpl();
- service.setName(serviceName);
- descriptor.addService(service);
- }
+ if (advancedServiceDiscoveryConfig.isServiceEnabled(serviceName)) {
+ ServiceImpl service = (ServiceImpl) descriptor.getService(serviceName);
+ if (service == null) {
+ service = new ServiceImpl();
+ service.setName(serviceName);
+ descriptor.addService(service);
+ }
- // configuration value may contain ":" (for instance http://host:port) -> considering a configuration name/value pair everything after '$SERVICE_NAME:'
- final String serviceConfiguration = configurationPair.substring(serviceName.length() + 1).trim();
- final String[] serviceConfigurationParts = serviceConfiguration.split("=", 2);
- final String serviceConfigurationName = serviceConfigurationParts[0].trim();
- final String serviceConfigurationValue = serviceConfigurationParts[1].trim();
- switch (serviceConfigurationName) {
- case CONFIG_NAME_SERVICE_URL:
- service.addUrl(serviceConfigurationValue);
- break;
- case CONFIG_NAME_SERVICE_VERSION:
- service.setVersion(serviceConfigurationValue);
- break;
- default:
- service.addParam(serviceConfigurationName, serviceConfigurationValue);
- break;
+ if (serviceParts.length > 1) {
+ // configuration value may contain ":" (for instance http://host:port) -> considering a configuration name/value pair everything after '$SERVICE_NAME:'
+ final String serviceConfiguration = configurationPair.substring(serviceName.length() + 1).trim();
+ final String[] serviceConfigurationParts = serviceConfiguration.split("=", 2);
+ final String serviceConfigurationName = serviceConfigurationParts[0].trim();
+ final String serviceConfigurationValue = serviceConfigurationParts[1].trim();
+ switch (serviceConfigurationName) {
+ case CONFIG_NAME_SERVICE_URL:
+ service.addUrl(serviceConfigurationValue);
+ break;
+ case CONFIG_NAME_SERVICE_VERSION:
+ service.setVersion(serviceConfigurationValue);
+ break;
+ default:
+ service.addParam(serviceConfigurationName, serviceConfigurationValue);
+ break;
+ }
+ }
+ } else {
+ log.serviceDisabled(serviceName, descriptor.getName());
}
}
+ @Override
+ public void onAdvancedServiceDiscoveryConfigurationChange(Properties newConfiguration) {
+ advancedServiceDiscoveryConfig = new AdvancedServiceDiscoveryConfig(newConfiguration);
+ log.updatedAdvanceServiceDiscoverytConfiguration();
+ }
+
}
diff --git a/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvanceServiceDiscoveryConfigurationMessages.java b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvanceServiceDiscoveryConfigurationMessages.java
new file mode 100644
index 0000000..e51f0d2
--- /dev/null
+++ b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvanceServiceDiscoveryConfigurationMessages.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.knox.gateway.topology.discovery.advanced;
+
+import org.apache.knox.gateway.i18n.messages.Message;
+import org.apache.knox.gateway.i18n.messages.MessageLevel;
+import org.apache.knox.gateway.i18n.messages.Messages;
+import org.apache.knox.gateway.i18n.messages.StackTrace;
+
+@Messages(logger = "org.apache.knox.gateway.topology.discovery.advanced")
+public interface AdvanceServiceDiscoveryConfigurationMessages {
+
+ @Message(level = MessageLevel.INFO, text = "Monitoring {0} for changes.")
+ void monitorStarted(String path);
+
+ @Message(level = MessageLevel.ERROR, text = "Error while monitoring CM advanced configuration: {1}")
+ void failedToMonitorClouderaManagerAdvancedConfiguration(String errorMessage, @StackTrace(level = MessageLevel.DEBUG) Exception e);
+
+ @Message(level = MessageLevel.INFO, text = "Notifying listeners due to advanced service discovery configuration changes...")
+ void notifyListeners();
+
+}
diff --git a/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvancedServiceDiscoveryConfig.java b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvancedServiceDiscoveryConfig.java
new file mode 100644
index 0000000..11fc56e
--- /dev/null
+++ b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvancedServiceDiscoveryConfig.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.knox.gateway.topology.discovery.advanced;
+
+import static java.util.stream.Collectors.toSet;
+
+import java.util.Locale;
+import java.util.Map.Entry;
+import java.util.stream.Stream;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Wrapper class providing useful methods on properties coming from
+ * <code>$KNOX_CONF_DIR/auto-discovery-advanced-configuration.properties</code>
+ */
+public class AdvancedServiceDiscoveryConfig {
+
+ public static final String PARAMETER_NAME_PREFIX_ENABLED_SERVICE = "gateway.auto.discovery.enabled.";
+ public static final String PARAMETER_NAME_EXPECTED_TOPOLOGIES = "gateway.auto.discovery.expected.topology.names";
+
+ private final Properties properties;
+
+ public AdvancedServiceDiscoveryConfig() {
+ this(null);
+ }
+
+ public AdvancedServiceDiscoveryConfig(Properties properties) {
+ this.properties = properties == null ? new Properties() : properties;
+ }
+
+ public boolean isServiceEnabled(String serviceName) {
+ return Boolean.valueOf(getPropertyIgnoreCase(PARAMETER_NAME_PREFIX_ENABLED_SERVICE + serviceName, "true"));
+ }
+
+ public Set<String> getEnabledServiceNames() {
+ return properties.entrySet().stream().filter(keyValuePair -> Boolean.valueOf((String) keyValuePair.getValue()))
+ .map(keyValuePair -> ((String) keyValuePair.getKey()).substring(PARAMETER_NAME_PREFIX_ENABLED_SERVICE.length()).toUpperCase(Locale.getDefault())).collect(toSet());
+ }
+
+ public Set<String> getExpectedTopologyNames() {
+ return Stream.of(properties.getProperty(PARAMETER_NAME_EXPECTED_TOPOLOGIES, "").split(",")).map(expectedToplogyName -> expectedToplogyName.trim()).collect(toSet());
+ }
+
+ private String getPropertyIgnoreCase(String propertyName, String defaultValue) {
+ final String property = properties.getProperty(propertyName);
+ if (property != null) {
+ return property;
+ } else {
+ for (Entry<Object, Object> entry : properties.entrySet()) {
+ if (propertyName.equalsIgnoreCase((String) entry.getKey())) {
+ return (String) entry.getValue();
+ }
+ }
+ return defaultValue;
+ }
+ }
+}
diff --git a/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvancedServiceDiscoveryConfigChangeListener.java b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvancedServiceDiscoveryConfigChangeListener.java
new file mode 100644
index 0000000..21061ec
--- /dev/null
+++ b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvancedServiceDiscoveryConfigChangeListener.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.knox.gateway.topology.discovery.advanced;
+
+import java.util.Properties;
+
+/**
+ * The listener interface for receiving service discovery configuration events.
+ */
+public interface AdvancedServiceDiscoveryConfigChangeListener {
+
+ void onAdvancedServiceDiscoveryConfigurationChange(Properties newConfiguration);
+
+}
diff --git a/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvancedServiceDiscoveryConfigurationMonitor.java b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvancedServiceDiscoveryConfigurationMonitor.java
new file mode 100644
index 0000000..2f17a06
--- /dev/null
+++ b/gateway-cm-integration/src/main/java/org/apache/knox/gateway/topology/discovery/advanced/AdvancedServiceDiscoveryConfigurationMonitor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.knox.gateway.topology.discovery.advanced;
+
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.attribute.FileTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.knox.gateway.config.GatewayConfig;
+import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+
+/**
+ * Monitoring <code>$KNOX_CONF_DIR/auto-discovery-advanced-configuration.properties</code> (if exists) and notifies any
+ * {@link AdvancedServiceDiscoveryConfigChangeListener} if the file is changed since the last time it was loaded
+ *
+ */
+public class AdvancedServiceDiscoveryConfigurationMonitor {
+
+ private static final String ADVANCED_CONFIGURATION_FILE_NAME = "auto-discovery-advanced-configuration.properties";
+ private static final AdvanceServiceDiscoveryConfigurationMessages LOG = MessagesFactory.get(AdvanceServiceDiscoveryConfigurationMessages.class);
+
+ private final List<AdvancedServiceDiscoveryConfigChangeListener> listeners;
+
+ private ScheduledExecutorService executorService;
+ private FileTime lastReloadTime;
+
+ public AdvancedServiceDiscoveryConfigurationMonitor(GatewayConfig gatewayConfig) {
+ final long monitoringInterval = gatewayConfig.getClouderaManagerAdvancedServiceDiscoveryConfigurationMonitoringInterval();
+ if (monitoringInterval > 0) {
+ this.executorService = newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("AdvancedServiceDiscoveryConfigurationMonitor-%d").build());
+ final Path resourcePath = Paths.get(gatewayConfig.getGatewayConfDir(), ADVANCED_CONFIGURATION_FILE_NAME);
+ executorService.scheduleAtFixedRate(() -> monitorAdvancedServiceConfiguration(resourcePath), 0, monitoringInterval, TimeUnit.MILLISECONDS);
+ LOG.monitorStarted(resourcePath.toString());
+ }
+
+ listeners = new ArrayList<>();
+ }
+
+ public void registerListener(AdvancedServiceDiscoveryConfigChangeListener listener) {
+ listeners.add(listener);
+ }
+
+ private void monitorAdvancedServiceConfiguration(Path resourcePath) {
+ try {
+ if (Files.exists(resourcePath) && Files.isReadable(resourcePath)) {
+ FileTime lastModifiedTime = Files.getLastModifiedTime(resourcePath);
+ if (lastReloadTime == null || lastReloadTime.compareTo(lastModifiedTime) < 0) {
+ lastReloadTime = lastModifiedTime;
+ try (InputStream advanceconfigurationFileInputStream = Files.newInputStream(resourcePath)) {
+ Properties properties = new Properties();
+ properties.load(advanceconfigurationFileInputStream);
+ notifyListeners(properties);
+ }
+ }
+ }
+ } catch (IOException e) {
+ LOG.failedToMonitorClouderaManagerAdvancedConfiguration(e.getMessage(), e);
+ }
+ }
+
+ private void notifyListeners(Properties properties) {
+ LOG.notifyListeners();
+ listeners.forEach(listener -> listener.onAdvancedServiceDiscoveryConfigurationChange(properties));
+ }
+
+}
diff --git a/gateway-cm-integration/src/test/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorParserTest.java b/gateway-cm-integration/src/test/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorParserTest.java
index 84d825f..023d7be 100644
--- a/gateway-cm-integration/src/test/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorParserTest.java
+++ b/gateway-cm-integration/src/test/java/org/apache/knox/gateway/cm/descriptor/ClouderaManagerDescriptorParserTest.java
@@ -25,21 +25,31 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.knox.gateway.topology.discovery.advanced.AdvancedServiceDiscoveryConfig;
import org.apache.knox.gateway.topology.simple.SimpleDescriptor;
import org.apache.knox.gateway.topology.simple.SimpleDescriptor.Application;
import org.apache.knox.gateway.topology.simple.SimpleDescriptor.Service;
+import org.junit.Before;
import org.junit.Test;
public class ClouderaManagerDescriptorParserTest {
+ private ClouderaManagerDescriptorParser cmDescriptorParser;
+
+ @Before
+ public void setUp() {
+ cmDescriptorParser = new ClouderaManagerDescriptorParser();
+ }
+
@Test
- public void testXmlParser() throws Exception {
+ public void testCMDescriptorParser() throws Exception {
final String testConfigPath = this.getClass().getClassLoader().getResource("testDescriptor.xml").getPath();
- final Set<SimpleDescriptor> descriptors = ClouderaManagerDescriptorParser.parse(testConfigPath);
+ final Set<SimpleDescriptor> descriptors = cmDescriptorParser.parse(testConfigPath);
assertEquals(2, descriptors.size());
final Iterator<SimpleDescriptor> descriptorsIterator = descriptors.iterator();
validateTopology1(descriptorsIterator.next());
@@ -47,21 +57,55 @@ public class ClouderaManagerDescriptorParserTest {
}
@Test
- public void testXmlParserWrongDescriptorContent() throws Exception {
+ public void testCMDescriptorParserWrongDescriptorContent() throws Exception {
final String testConfigPath = this.getClass().getClassLoader().getResource("testDescriptorConfigurationWithWrongDescriptor.xml").getPath();
- final Set<SimpleDescriptor> descriptors = ClouderaManagerDescriptorParser.parse(testConfigPath);
+ final Set<SimpleDescriptor> descriptors = cmDescriptorParser.parse(testConfigPath);
assertEquals(1, descriptors.size());
final Iterator<SimpleDescriptor> descriptorsIterator = descriptors.iterator();
validateTopology1(descriptorsIterator.next());
}
@Test
- public void testXmlParserWrongXMLContent() throws Exception {
+ public void testCMDescriptorParserWrongXMLContent() throws Exception {
final String testConfigPath = this.getClass().getClassLoader().getResource("testDescriptorConfigurationWithNonHadoopStyleConfiguration.xml").getPath();
- final Set<SimpleDescriptor> descriptors = ClouderaManagerDescriptorParser.parse(testConfigPath);
+ final Set<SimpleDescriptor> descriptors = cmDescriptorParser.parse(testConfigPath);
assertTrue(descriptors.isEmpty());
}
+ @Test
+ public void testCMDescriptorParserWithNotEnabledServices() throws Exception {
+ final String testConfigPath = this.getClass().getClassLoader().getResource("testDescriptor.xml").getPath();
+ final Properties advancedConfiguration = new Properties();
+ advancedConfiguration.put(AdvancedServiceDiscoveryConfig.PARAMETER_NAME_PREFIX_ENABLED_SERVICE + "HIVE", "false");
+ cmDescriptorParser.onAdvancedServiceDiscoveryConfigurationChange(advancedConfiguration);
+ final Set<SimpleDescriptor> descriptors = cmDescriptorParser.parse(testConfigPath);
+ assertEquals(2, descriptors.size());
+ final Iterator<SimpleDescriptor> descriptorsIterator = descriptors.iterator();
+ SimpleDescriptor descriptor = descriptorsIterator.next();
+ assertNotNull(descriptor);
+ // topology1 comes with HIVE which is disabled
+ assertTrue(descriptor.getServices().isEmpty());
+ }
+
+ @Test
+ public void testCMDescriptorParserWithEnabledNotListedServiceInTopology1() throws Exception {
+ final String testConfigPath = this.getClass().getClassLoader().getResource("testDescriptor.xml").getPath();
+ final Properties advancedConfiguration = new Properties();
+ advancedConfiguration.put(AdvancedServiceDiscoveryConfig.PARAMETER_NAME_PREFIX_ENABLED_SERVICE + "OOZIE", "true");
+ advancedConfiguration.put(AdvancedServiceDiscoveryConfig.PARAMETER_NAME_EXPECTED_TOPOLOGIES, "topology1, topology100");
+ cmDescriptorParser.onAdvancedServiceDiscoveryConfigurationChange(advancedConfiguration);
+ final Set<SimpleDescriptor> descriptors = cmDescriptorParser.parse(testConfigPath);
+ final Iterator<SimpleDescriptor> descriptorsIterator = descriptors.iterator();
+ SimpleDescriptor descriptor = descriptorsIterator.next();
+ assertNotNull(descriptor);
+ // topology1 comes without OOZIE but it's enabled and topology1 is expected -> OOZIE should be added without any url/version/parameter
+ assertService(descriptor, "OOZIE", null, null, null);
+
+ descriptor = descriptorsIterator.next();
+ validateTopology2(descriptor);
+ assertNull(descriptor.getService("OOZIE"));
+ }
+
private void validateTopology1(SimpleDescriptor descriptor) {
assertEquals("topology1", descriptor.getName());
assertEquals("ClouderaManager", descriptor.getDiscoveryType());
@@ -90,7 +134,8 @@ public class ClouderaManagerDescriptorParserTest {
final Map<String, String> expectedServiceParameters = Stream.of(new String[][] { { "httpclient.connectionTimeout", "5m" }, { "httpclient.socketTimeout", "100m" }, })
.collect(Collectors.toMap(data -> data[0], data -> data[1]));
- assertService(descriptor, "HDFS", null, Collections.singletonList("http://localhost:456"), expectedServiceParameters);
+ assertService(descriptor, "ATLAS-API", null, Collections.singletonList("http://localhost:456"), expectedServiceParameters);
+ assertService(descriptor, "NIFI", null, null, null);
}
private void assertApplication(SimpleDescriptor descriptor, String expectedApplicationName, Map<String, String> expectedParams) {
@@ -114,6 +159,8 @@ public class ClouderaManagerDescriptorParserTest {
if (expectedUrls != null) {
assertTrue(service.getURLs().containsAll(expectedUrls));
+ } else {
+ assertNull(service.getURLs());
}
if (expectedParams != null) {
diff --git a/gateway-cm-integration/src/test/resources/testDescriptor.xml b/gateway-cm-integration/src/test/resources/testDescriptor.xml
index c853600..a2593df 100644
--- a/gateway-cm-integration/src/test/resources/testDescriptor.xml
+++ b/gateway-cm-integration/src/test/resources/testDescriptor.xml
@@ -39,9 +39,10 @@ limitations under the License.
discoveryAddress=http://host:456;
cluster=Cluster 2;
providerConfigRef=topology2-provider;
- HDFS:url=http://localhost:456;
- HDFS:httpclient.connectionTimeout=5m;
- HDFS:httpclient.socketTimeout=100m
+ ATLAS-API:url=http://localhost:456;
+ ATLAS-API:httpclient.connectionTimeout=5m;
+ ATLAS-API:httpclient.socketTimeout=100m;
+ NIFI
</value>
</property>
</configuration>
\ No newline at end of file
diff --git a/gateway-cm-integration/src/test/resources/testDescriptorConfigurationWithWrongDescriptor.xml b/gateway-cm-integration/src/test/resources/testDescriptorConfigurationWithWrongDescriptor.xml
index ffa8c25..a249162 100644
--- a/gateway-cm-integration/src/test/resources/testDescriptorConfigurationWithWrongDescriptor.xml
+++ b/gateway-cm-integration/src/test/resources/testDescriptorConfigurationWithWrongDescriptor.xml
@@ -40,7 +40,7 @@ limitations under the License.
cluster=Cluster 2;
providerConfigRef=topology2-provider;
HDFS:url=http://localhost:456;
- HDFS <!-- can not be parsed -->
+ HDFS:noValueParam <!-- can not be parsed -->
</value>
</property>
</configuration>
\ No newline at end of file
diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/GatewayServer.java b/gateway-server/src/main/java/org/apache/knox/gateway/GatewayServer.java
index 82c8bbb..1e7e727 100644
--- a/gateway-server/src/main/java/org/apache/knox/gateway/GatewayServer.java
+++ b/gateway-server/src/main/java/org/apache/knox/gateway/GatewayServer.java
@@ -28,6 +28,7 @@ import org.apache.knox.gateway.audit.api.Auditor;
import org.apache.knox.gateway.audit.api.ResourceType;
import org.apache.knox.gateway.audit.log4j.audit.AuditConstants;
import org.apache.knox.gateway.cm.descriptor.ClouderaManagerDescriptorMonitor;
+import org.apache.knox.gateway.cm.descriptor.ClouderaManagerDescriptorParser;
import org.apache.knox.gateway.config.GatewayConfig;
import org.apache.knox.gateway.config.GatewayConfigurationException;
import org.apache.knox.gateway.config.impl.GatewayConfigImpl;
@@ -48,6 +49,7 @@ import org.apache.knox.gateway.topology.Application;
import org.apache.knox.gateway.topology.Topology;
import org.apache.knox.gateway.topology.TopologyEvent;
import org.apache.knox.gateway.topology.TopologyListener;
+import org.apache.knox.gateway.topology.discovery.advanced.AdvancedServiceDiscoveryConfigurationMonitor;
import org.apache.knox.gateway.trace.AccessHandler;
import org.apache.knox.gateway.trace.KnoxErrorHandler;
import org.apache.knox.gateway.trace.TraceHandler;
@@ -622,8 +624,12 @@ public class GatewayServer {
"org.eclipse.jetty.webapp.JettyWebXmlConfiguration",
"org.eclipse.jetty.annotations.AnnotationConfiguration" );
- final ClouderaManagerDescriptorMonitor cmDescriptorMonitor = new ClouderaManagerDescriptorMonitor(config);
+ final ClouderaManagerDescriptorParser cmDescriptorParser = new ClouderaManagerDescriptorParser();
+ final ClouderaManagerDescriptorMonitor cmDescriptorMonitor = new ClouderaManagerDescriptorMonitor(config, cmDescriptorParser);
cmDescriptorMonitor.setupMonitor();
+ final AdvancedServiceDiscoveryConfigurationMonitor advancedServiceDiscoveryConfigurationMonitor = new AdvancedServiceDiscoveryConfigurationMonitor(config);
+ advancedServiceDiscoveryConfigurationMonitor.registerListener(cmDescriptorParser);
+ advancedServiceDiscoveryConfigurationMonitor.registerListener(cmDescriptorMonitor);
// Load the current topologies.
// Redeploy autodeploy topologies.
diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java b/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
index 6c1ccaa..36ba1ee 100644
--- a/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
+++ b/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
@@ -242,6 +242,8 @@ public class GatewayConfigImpl extends Configuration implements GatewayConfig {
private static final String CLOUDERA_MANAGER_DESCRIPTORS_MONITOR_INTERVAL = GATEWAY_CONFIG_FILE_PREFIX + ".cloudera.manager.descriptors.monitor.interval";
private static final long DEFAULT_CLOUDERA_MANAGER_DESCRIPTORS_MONITOR_INTERVAL = 30000L;
+ private static final String CLOUDERA_MANAGER_ADVANCED_SERVICE_DISCOVERY_CONF_MONITOR_INTERVAL = GATEWAY_CONFIG_FILE_PREFIX + ".cloudera.manager.advanced.service.discovery.config.monitor.interval";
+ private static final long DEFAULT_CLOUDERA_MANAGER_ADVANCED_SERVICE_DISCOVERY_CONF_MONITOR_INTERVAL = 30000L;
public GatewayConfigImpl() {
init();
@@ -1102,4 +1104,9 @@ public class GatewayConfigImpl extends Configuration implements GatewayConfig {
public long getClouderaManagerDescriptorsMonitoringInterval() {
return getLong(CLOUDERA_MANAGER_DESCRIPTORS_MONITOR_INTERVAL, DEFAULT_CLOUDERA_MANAGER_DESCRIPTORS_MONITOR_INTERVAL);
}
+
+ @Override
+ public long getClouderaManagerAdvancedServiceDiscoveryConfigurationMonitoringInterval() {
+ return getLong(CLOUDERA_MANAGER_ADVANCED_SERVICE_DISCOVERY_CONF_MONITOR_INTERVAL, DEFAULT_CLOUDERA_MANAGER_ADVANCED_SERVICE_DISCOVERY_CONF_MONITOR_INTERVAL);
+ }
}
diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java b/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java
index 94e7642..4aa7192 100644
--- a/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java
@@ -647,4 +647,9 @@ public interface GatewayConfig {
* @return the monitoring interval (in milliseconds) of Cloudera Manager descriptors
*/
long getClouderaManagerDescriptorsMonitoringInterval();
+
+ /**
+ * @return the monitoring interval (in milliseconds) of Cloudera Manager advanced service discovery configuration
+ */
+ long getClouderaManagerAdvancedServiceDiscoveryConfigurationMonitoringInterval();
}
diff --git a/gateway-test-release-utils/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java b/gateway-test-release-utils/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
index 6170550..f7df9a3 100644
--- a/gateway-test-release-utils/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
+++ b/gateway-test-release-utils/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
@@ -769,4 +769,9 @@ public class GatewayTestConfig extends Configuration implements GatewayConfig {
public long getClouderaManagerDescriptorsMonitoringInterval() {
return 0;
}
+
+ @Override
+ public long getClouderaManagerAdvancedServiceDiscoveryConfigurationMonitoringInterval() {
+ return 0;
+ }
}
diff --git a/gateway-topology-simple/src/main/java/org/apache/knox/gateway/topology/simple/SimpleDescriptorImpl.java b/gateway-topology-simple/src/main/java/org/apache/knox/gateway/topology/simple/SimpleDescriptorImpl.java
index e8ed738..41e7f9b 100644
--- a/gateway-topology-simple/src/main/java/org/apache/knox/gateway/topology/simple/SimpleDescriptorImpl.java
+++ b/gateway-topology-simple/src/main/java/org/apache/knox/gateway/topology/simple/SimpleDescriptorImpl.java
@@ -20,6 +20,7 @@ package org.apache.knox.gateway.topology.simple;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -122,6 +123,10 @@ public class SimpleDescriptorImpl implements SimpleDescriptor {
services.add(service);
}
+ public void setServices(Collection<Service> services) {
+ this.services = new ArrayList<>(services);
+ }
+
@Override
public List<Service> getServices() {
List<Service> result = new ArrayList<>();