You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2017/02/13 02:13:39 UTC
ignite git commit: IGNITE-4160: working implementation of Kubernetes
IP finder.
Repository: ignite
Updated Branches:
refs/heads/ignite-4159 d949b739d -> b4b924023
IGNITE-4160: working implementation of Kubernetes IP finder.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b4b92402
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b4b92402
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b4b92402
Branch: refs/heads/ignite-4159
Commit: b4b9240231641614af5318ac0f57721dae5dc30c
Parents: d949b73
Author: Denis Magda <dm...@gridgain.com>
Authored: Sun Feb 12 18:13:26 2017 -0800
Committer: Denis Magda <dm...@gridgain.com>
Committed: Sun Feb 12 18:13:26 2017 -0800
----------------------------------------------------------------------
.../TcpDiscoveryKubernetesIpFinder.java | 278 +++++++++++++++++++
.../tcp/ipfinder/kubernetes/package-info.java | 22 ++
pom.xml | 1 +
3 files changed, 301 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b4b92402/modules/kubernetes/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/kubernetes/TcpDiscoveryKubernetesIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/kubernetes/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/kubernetes/TcpDiscoveryKubernetesIpFinder.java b/modules/kubernetes/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/kubernetes/TcpDiscoveryKubernetesIpFinder.java
new file mode 100644
index 0000000..3c292e8
--- /dev/null
+++ b/modules/kubernetes/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/kubernetes/TcpDiscoveryKubernetesIpFinder.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.ipfinder.kubernetes;
+
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.SecureRandom;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderAdapter;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * Kubernetes Service based IP finder.
+ */
+public class TcpDiscoveryKubernetesIpFinder extends TcpDiscoveryIpFinderAdapter {
+ /** Grid logger. */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** Init routine guard. */
+ private final AtomicBoolean initGuard = new AtomicBoolean();
+
+ /** Init routine latch. */
+ private final CountDownLatch initLatch = new CountDownLatch(1);
+
+ /** Trust manager. */
+ private TrustManager[] trustAll = new TrustManager[] {
+ new X509TrustManager() {
+ public void checkServerTrusted(X509Certificate[] certs, String authType) {}
+ public void checkClientTrusted(X509Certificate[] certs, String authType) {}
+ public X509Certificate[] getAcceptedIssuers() { return null; }
+ }
+ };
+
+ /** Host verifier. */
+ private HostnameVerifier trustAllHosts = new HostnameVerifier() {
+ public boolean verify(String hostname, SSLSession session) {
+ return true;
+ }
+ };
+
+ /** Ignite's Kubernetes Service name. */
+ private String serviceName = "ignite";
+
+ /** Ignite Pod namespace name. */
+ private String namespace = "default";
+
+ /** Kubernetes master URL. */
+ private String master = "https://kubernetes.default.svc.cluster.local:443";
+
+ /** Account token location. */
+ private String accountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token";
+
+ /** Kubernets API URL. */
+ private URL url;
+
+ /** SSL context */
+ private SSLContext ctx;
+
+ /**
+ *
+ */
+ public TcpDiscoveryKubernetesIpFinder() {
+ setShared(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException {
+ init();
+
+ Collection<InetSocketAddress> addrs = new ArrayList<>();
+
+ try {
+ System.out.println("Getting Apache Ignite endpoints from: " + url);
+
+ HttpsURLConnection conn = (HttpsURLConnection)url.openConnection();
+
+ conn.setHostnameVerifier(trustAllHosts);
+
+ conn.setSSLSocketFactory(ctx.getSocketFactory());
+ conn.addRequestProperty("Authorization", "Bearer " + serviceAccountToken(accountToken));
+
+ // Sending the request and processing a response.
+ ObjectMapper mapper = new ObjectMapper();
+
+ Endpoints endpoints = mapper.readValue(conn.getInputStream(), Endpoints.class);
+
+ if (endpoints != null) {
+ if (endpoints.subsets != null && !endpoints.subsets.isEmpty()) {
+ for (Subset subset : endpoints.subsets) {
+
+ if (subset.addresses != null && !subset.addresses.isEmpty()) {
+ for (Address address : subset.addresses) {
+ addrs.add(new InetSocketAddress(address.ip, 0));
+
+ System.out.println("Added an address to the list: " + address.ip);
+ }
+ }
+ }
+ }
+ }
+ }
+ catch (Exception e) {
+ throw new IgniteSpiException("Failed to retrieve Ignite pods IP addresses.", e);
+ }
+
+ return addrs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {
+ // No-op
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {
+ // No-op
+ }
+
+ /**
+ * Sets the name of Ignite's Kubernetes Service where the IP finder will connect to in order to retrieve IP
+ * addresses of existing Ignite pods.
+ *
+ * @param service Ignite's Kubernetes Service name.
+ */
+ public void serviceName(String service) {
+ this.serviceName = service;
+ }
+
+ /**
+ * Sets the namespace name Ignite's Kubernetes Service belongs to.
+ * If it is not set then 'default' is used as the namespace.
+ *
+ * @param namespace Ignite's Kubernetes Service namespace name.
+ */
+ public void namespace(String namespace) {
+ this.namespace = namespace;
+ }
+
+ /**
+ * Sets Kubernetes master's URL. By default 'kubernetes.default.svc' is used.
+ *
+ * @param master URL string of Kubernetes master.
+ */
+ public void masterUrl(String master) {
+ this.master = master;
+ }
+
+ /**
+ * @param accountToken
+ */
+ public void accountToken(String accountToken) {
+ this.accountToken = accountToken;
+ }
+
+ /**
+ * Kubernetes IP finder initalization.
+ *
+ * @throws IgniteSpiException In case of error.
+ */
+ private void init() throws IgniteSpiException {
+ if (initGuard.compareAndSet(false, true)) {
+
+ if (serviceName == null || serviceName.isEmpty() ||
+ namespace == null || namespace.isEmpty() ||
+ master == null || master.isEmpty() ||
+ accountToken == null || accountToken.isEmpty()) {
+ throw new IgniteSpiException(
+ "One or more configuration parameters are invalid [serviceName=" +
+ serviceName + ", namespace=" + namespace + ", masterUrl=" +
+ master + ", accountToken=" + accountToken + "]");
+ }
+
+ try {
+ // Preparing the URL and SSL context to be used for connection purposes.
+ String path = String.format("/api/v1/namespaces/%s/endpoints/%s", namespace, serviceName);
+
+ url = new URL(master + path);
+
+ ctx = SSLContext.getInstance("SSL");
+
+ ctx.init(null, trustAll, new SecureRandom());
+ }
+ catch (Exception e) {
+ throw new IgniteSpiException("Failed to connect to Ignite's Kubernetes Service.", e);
+ }
+ finally {
+ initLatch.countDown();
+ }
+ }
+ else {
+ try {
+ U.await(initLatch);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new IgniteSpiException("Thread has been interrupted.", e);
+ }
+
+ if (url == null || ctx == null)
+ throw new IgniteSpiException("IP finder has not been initialized properly.");
+ }
+ }
+
+ /**
+ * @param file
+ * @return
+ */
+ private String serviceAccountToken(String file) {
+ try {
+ return new String(Files.readAllBytes(Paths.get(file)));
+ } catch (IOException e) {
+ throw new IgniteSpiException("Failed to load services account token [accountToken= " + file + "]", e);
+ }
+ }
+
+ /**
+ *
+ */
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ private static class Address {
+ /** */
+ public String ip;
+ }
+
+ /**
+ *
+ */
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ private static class Subset {
+ /** */
+ public List<Address> addresses;
+ }
+
+ /**
+ *
+ */
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ private static class Endpoints {
+ /** */
+ public List<Subset> subsets;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b4b92402/modules/kubernetes/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/kubernetes/package-info.java
----------------------------------------------------------------------
diff --git a/modules/kubernetes/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/kubernetes/package-info.java b/modules/kubernetes/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/kubernetes/package-info.java
new file mode 100644
index 0000000..6b42121
--- /dev/null
+++ b/modules/kubernetes/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/kubernetes/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains Google Cloud Storage IP finder.
+ */
+package org.apache.ignite.spi.discovery.tcp.ipfinder.kubernetes;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b4b92402/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ea76053..238361b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@
<module>modules/web/ignite-websphere-test</module>
<module>modules/cassandra</module>
<module>modules/flink</module>
+ <module>modules/kubernetes</module>
</modules>
<profiles>